יצירת שאילתות מתמשכות

במאמר הזה מוסבר איך להריץ שאילתה רציפה ב-BigQuery.

שאילתות מתמשכות ב-BigQuery הן הצהרות SQL שמופעלות באופן רציף. שאילתות רציפות מאפשרות לכם לנתח נתונים נכנסים ב-BigQuery בזמן אמת, ואז לייצא את התוצאות ל-Bigtable, ל-Pub/Sub או ל-Spanner, או לכתוב את התוצאות בטבלה ב-BigQuery.

בחירת סוג החשבון

אפשר ליצור ולהריץ משימת שאילתה רציפה באמצעות חשבון משתמש, או ליצור משימת שאילתה רציפה באמצעות חשבון משתמש ואז להריץ אותה באמצעות חשבון שירות. כדי להריץ שאילתה מתמשכת שמייצאת תוצאות לנושא Pub/Sub, צריך להשתמש בחשבון שירות.

כשמשתמשים בחשבון משתמש, שאילתה מתמשכת פועלת עד יומיים. כשמשתמשים בחשבון שירות, שאילתה מתמשכת פועלת עד 150 ימים. מידע נוסף זמין במאמר בנושא הרשאות.

ההרשאות הנדרשות

בקטע הזה מוסבר אילו הרשאות נדרשות כדי ליצור ולהריץ שאילתה מתמשכת. לחלופין, במקום להשתמש בתפקידים שצוינו ב-IAM (הפלטפורמה לניהול זהויות והרשאות גישה), אפשר לקבל את ההרשאות הנדרשות באמצעות תפקידים בהתאמה אישית.

הרשאות כשמשתמשים בחשבון משתמש

בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים כדי ליצור ולהריץ שאילתה מתמשכת באמצעות חשבון משתמש.

כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:

כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:

כדי לעדכן נתונים בטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת ה-IAM‏ bigquery.tables.updateData. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:

אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של Service Usage (roles/serviceusage.serviceUsageAdmin).

הרשאות כשמשתמשים בחשבון שירות

בקטע הזה מפורטים התפקידים וההרשאות שנדרשים לחשבון המשתמש שיוצר את השאילתה המתמשכת ולחשבון השירות שמריץ את השאילתה המתמשכת.

הרשאות בחשבון משתמש

כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:

כדי לשלוח עבודה שמופעלת באמצעות חשבון שירות, לחשבון המשתמש צריך להיות התפקיד משתמש בחשבון שירות (roles/iam.serviceAccountUser). אם אתם משתמשים באותו חשבון משתמש כדי ליצור את חשבון השירות, לחשבון המשתמש צריך להיות התפקיד אדמין של חשבון שירות (roles/iam.serviceAccountAdmin). במאמר הקצאת תפקיד יחיד מוסבר איך להגביל את הגישה של משתמש לחשבון שירות יחיד, ולא לכל חשבונות השירות בפרויקט.

אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של Service Usage (roles/serviceusage.serviceUsageAdmin).

הרשאות של חשבון שירות

כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון השירות צריכה להיות הרשאת IAM‏ bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:

כדי לעדכן נתונים בטבלת BigQuery, לחשבון השירות צריכה להיות הרשאת ה-IAM‏ bigquery.tables.updateData. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:

לפני שמתחילים

  1. בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים פרויקט ב- Google Cloud או יוצרים אותו.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים

    כניסה לדף לבחירת הפרויקט

  2. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  3. מפעילים את BigQuery API.

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    להפעלת ה-API

יצירת בקשה לשמירת מקום

יוצרים הזמנה למהדורת Enterprise או Enterprise Plus, ואז יוצרים הקצאת הזמנה עם סוג עבודה CONTINUOUS. אפשר להשתמש במקום השמור הזה בהתאמה אוטומטית לעומס ובשיתוף יחידות קיבולת פנויות. יש מגבלות על הזמנות שחלות על הקצאות של הזמנות לשאילתות רציפות.

ייצוא ל-Pub/Sub

כדי לייצא נתונים ל-Pub/Sub, צריך להשתמש בממשקי API נוספים, בהרשאות IAM וב Google Cloud משאבים. מידע נוסף זמין במאמר בנושא ייצוא ל-Pub/Sub.

עיבוד מוטציות באמצעות CHANGES

כשמייצאים נתונים ל-Pub/Sub, אפשר להשתמש בפונקציית היסטוריית השינויים CHANGES. הפונקציה CHANGES מעבדת את כל השורות ששונו בטבלת המקור, כולל הוספות ושינויים.

הטמעה של מאפיינים מותאמים אישית כמטא-נתונים בהודעות Pub/Sub

אתם יכולים להשתמש במאפייני Pub/Sub כדי לספק מידע נוסף על ההודעה, כמו העדיפות, המקור, היעד או מטא-נתונים נוספים. אפשר גם להשתמש במאפיינים כדי לסנן הודעות במינוי.

בתוצאה של שאילתה מתמשכת, אם שם העמודה הוא _ATTRIBUTES, הערכים שלה מועתקים למאפייני ההודעה ב-Pub/Sub. השדות שצוינו ב-_ATTRIBUTES משמשים כמפתחות מאפיינים.

העמודה _ATTRIBUTES צריכה להיות מסוג JSON, בפורמט ARRAY<STRUCT<STRING, STRING>> או STRUCT<STRING>.

דוגמה מופיעה במאמר בנושא ייצוא נתונים לנושא Pub/Sub.

ייצוא אל Bigtable

כדי לייצא נתונים ל-Bigtable, צריך להשתמש בממשקי API נוספים, בהרשאות IAM ובמשאבים Google Cloud נוספים. מידע נוסף זמין במאמר בנושא ייצוא ל-Bigtable.

ייצוא ל-Spanner

כדי לייצא נתונים ל-Spanner, צריך ממשקי API נוספים, הרשאות IAM ומשאבים. Google Cloud מידע נוסף זמין במאמר בנושא ייצוא ל-Spanner (תהליך ETL הפוך).

כתיבת נתונים לטבלה ב-BigQuery

אפשר לכתוב נתונים לטבלה ב-BigQuery באמצעות הצהרת INSERT.

שימוש בפונקציות AI

כדי להשתמש בפונקציית AI נתמכת בשאילתה מתמשכת, נדרשים ממשקי API נוספים, הרשאות IAM ומשאבים. Google Cloudלמידע נוסף, אפשר לעיין באחד מהנושאים הבאים, בהתאם לתרחיש השימוש:

כשמשתמשים בפונקציית AI בשאילתה מתמשכת, צריך לבדוק אם הפלט של השאילתה יישאר במסגרת המכסה של הפונקציה. אם חורגים מהמכסה, יכול להיות שיהיה צורך לטפל בנפרד ברשומות שלא עוברות עיבוד.

מציינים שעת התחלה לשאילתה הרציפה

במקרה של ייצוא ל-Pub/Sub, צריך להשתמש בפונקציה APPENDS או בפונקציה CHANGES בסעיף FROM של שאילתה מתמשכת כדי לציין את הנתונים המוקדמים ביותר לעיבוד. לדוגמה, APPENDS(TABLE my_table, start_timestamp).

הארגומנט start_timestamp מגדיר את הנקודה בזמן שבה השאילתה המתמשכת מתחילה לעבד נתונים. לדוגמה, APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) אומר ל-BigQuery לעבד נתונים שנוספו לטבלה my_table לכל היותר 10 דקות לפני תחילת השאילתה המתמשכת. נתונים שנוספים ל-my_table לאחר מכן מעובדים כשהם מתקבלים. לא מוטל עיכוב על עיבוד הנתונים.

כשמציינים את הארגומנט start_timestamp, הערך חייב להיות בטווח של חלון הזמן של הטבלה, שמוגדר כברירת מחדל לשבעה ימים בטבלאות רגילות. הגדרת start_timestamp ל-NULL תגרום לכך שברירת המחדל תהיה זמן היצירה של הטבלה. לא מומלץ להשתמש בערך NULL, כי אם הטבלה נוצרה לפני חלון הזמן של האפשרות 'חזרה בזמן', המערכת תחזיר שגיאה. שאילתה באמצעות NULL עשויה להצליח בטבלה שנוצרה לאחרונה, אבל להיכשל בהמשך אם חותמת הזמן של יצירת הטבלה תהיה מחוץ לחלון של שבעה ימים.

אל תספקו ארגומנט end_timestamp לפונקציה APPENDS כשמשתמשים בה בשאילתה מתמשכת.

בדוגמה הבאה מוצג איך להתחיל שאילתה מתמשכת מנקודת זמן מסוימת באמצעות הפונקציה APPENDS, כשמריצים שאילתה על טבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

מציינים נקודת התחלה מוקדמת יותר מחלון הזמן של הנסיעה

כדי לכלול נתונים שלא נכללים בחלון הזמן של שבעה ימים של 'חזרה בזמן', צריך להשתמש בשאילתה רגילה כדי למלא את החוסרים בנתונים עד לנקודת זמן מסוימת, ואז להפעיל שאילתה מתמשכת מאותה נקודה. חותמת הזמן של ה'העברה' צריכה להיות בטווח של שבעה ימים, כדי שהשאילתה המתמשכת תוכל להמשיך מהמקום שבו השאילתה הרגילה הסתיימה.

בדוגמה הבאה מוצג איך למלא מחדש נתונים ישנים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בזמן אמת, ואז לעבור לשאילתה רציפה.

  1. מריצים שאילתה רגילה כדי למלא נתונים חסרים עד לנקודת זמן מסוימת:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this handoff point.
      -- This handoff timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. להריץ שאילתה מתמשכת מהנקודה בזמן שבה השאילתה נעצרה:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off + 1 microsecond.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

הרצת שאילתה מתמשכת באמצעות חשבון משתמש

בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון משתמש. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת על ידי חשבון משתמש פועלת למשך יומיים לכל היותר ואז נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי לבצע את התהליך הזה אוטומטית, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.

כדי להריץ שאילתה מתמשכת:

המסוף

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בעורך השאילתות, לוחצים על עריכה > מצב שאילתה > שאילתה מתמשכת. כדי לאשר את הבחירה, לוחצים על אישור.

  3. אופציונלי: כדי לשלוט במשך הזמן שהשאילתה פועלת, לוחצים על עריכה > הגדרות שאילתה ומגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות השנייה.

  4. בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

  5. לוחצים על Run.

BQ

  1. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  2. ב-Cloud Shell, מריצים את השאילתה המתמשכת באמצעות הפקודה bq query עם הדגל --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    מחליפים את QUERY בהצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. אפשר לקבוע כמה זמן השאילתה תפעל באמצעות הדגל --job_timeout_ms.

API

מריצים את השאילתה המתמשכת על ידי קריאה ל-method‏ jobs.insert. צריך להגדיר את השדה continuous לערך true ב-JobConfigurationQuery של משאב Job שמעבירים. אפשר גם לשלוט באורך הזמן שבו השאילתה פועלת באמצעות הגדרת השדה jobTimeoutMs.

curl --request POST \
  "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
  --header "Authorization: Bearer $(gcloud auth print-access-token)" \
  --header "Content-Type: application/json; charset=utf-8" \
  --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
  --compressed

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט.
  • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

הרצת שאילתה מתמשכת באמצעות חשבון שירות

בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון שירות. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת באמצעות חשבון שירות יכולה לפעול עד 150 ימים ואז היא נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי לבצע את התהליך הזה אוטומטית, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.

בצע את השלבים הבאים כדי להשתמש בחשבון שירות להרצת שאילתה מתמשכת:

המסוף

  1. יוצרים חשבון שירות.
  2. מעניקים לחשבון השירות את ההרשאות הנדרשות.
  3. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  4. בעורך השאילתות, לוחצים על עריכה > מצב שאילתה > שאילתה מתמשכת. כדי לאשר את הבחירה, לוחצים על אישור.

  5. בעורך השאילתות, לוחצים על Edit (עריכה) > Query settings (הגדרות השאילתה).

  6. בקטע שאילתה מתמשכת, משתמשים בתיבה Service account כדי לבחור את חשבון השירות שיצרתם.

  7. אופציונלי: כדי לקבוע כמה זמן השאילתה תפעל, מגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות שנייה.

  8. לוחצים על Save.

  9. בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

  10. לוחצים על Run.

BQ

  1. יוצרים חשבון שירות.
  2. מעניקים לחשבון השירות את ההרשאות הנדרשות.
  3. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

    בחלק התחתון של Google Cloud המסוף יתחיל סשן של Cloud Shell ותופיע הודעה של שורת הפקודה. Cloud Shell היא סביבת מעטפת שבה ה-CLI של Google Cloud מותקן ומוגדרים ערכים לפרויקט הקיים. הסשן יופעל תוך כמה שניות.

  4. בשורת הפקודה, מריצים את השאילתה המתמשכת באמצעות הפקודה bq query עם הדגלים הבאים:

    • מגדירים את הדגל --continuous לערך true כדי שהשאילתה תהיה רציפה.
    • משתמשים בדגל --connection_property כדי לציין חשבון שירות לשימוש.
    • אופציונלי: מגדירים את הדגל --job_timeout_ms כדי להגביל את זמן הריצה של השאילתה.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • SERVICE_ACCOUNT_EMAIL: כתובת האימייל בחשבון השירות. אפשר למצוא את כתובת האימייל בחשבון השירות בדף Service accounts במסוף Google Cloud .
    • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

API

  1. יוצרים חשבון שירות.
  2. מעניקים לחשבון השירות את ההרשאות הנדרשות.
  3. מריצים את השאילתה המתמשכת על ידי קריאה ל-method‏ jobs.insert. מגדירים את השדות הבאים במשאב JobConfigurationQuery של משאב Job שמעבירים:

    • מגדירים את השדה continuous לערך true כדי שהשאילתה תפעל ברציפות.
    • משתמשים בשדה connectionProperties כדי לציין חשבון שירות לשימוש.

    אפשר גם לשלוט באורך הזמן שבו השאילתה פועלת על ידי הגדרת השדה jobTimeoutMs במשאב JobConfiguration.

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
      --compressed

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
    • SERVICE_ACCOUNT_EMAIL: כתובת האימייל בחשבון השירות. אפשר למצוא את כתובת האימייל בחשבון השירות בדף Service accounts במסוף Google Cloud .

יצירת מזהה משרה בהתאמה אישית

לכל עבודת שאילתה מוקצה מזהה עבודה שאפשר להשתמש בו כדי לחפש את העבודה ולנהל אותה. כברירת מחדל, מזהי המשימות נוצרים באופן אקראי. כדי להקל על החיפוש של מזהה המשימה של שאילתה מתמשכת באמצעות היסטוריית המשימות או כלי לבדיקת משימות, אפשר להקצות קידומת מותאמת אישית למזהה המשימה:

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בעורך השאילתות, לוחצים על עריכה > מצב שאילתה > שאילתה מתמשכת.

  3. לוחצים על אישור.

  4. בעורך השאילתות, לוחצים על Edit (עריכה) > Query settings (הגדרות השאילתה).

  5. בקטע קידומת מותאמת אישית למזהה המשרה, מזינים קידומת מותאמת אישית לשם.

  6. לוחצים על Save.

עיבוד עם שמירת מצב (stateful) באמצעות JOINs וצבירות של חלונות

פעולות עם שמירת מצב מאפשרות לביצוע שאילתות רציפות ניתוח מורכב על ידי שמירת מידע בכמה שורות או מרווחי זמן. הפעולות האלה כוללות JOINs וצבירות של חלונות.

מידע מפורט על השימוש בפעולות האלה עם שמירת מצב זמין בנושאים הבאים:

דוגמאות

בדוגמאות הבאות של SQL מוצגים תרחישי שימוש נפוצים בשאילתות רציפות.

ייצוא נתונים לנושא Pub/Sub

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומפרסמת את הנתונים של נסיעות שבוטלו בנושא Pub/Sub בזמן אמת עם מאפייני הודעה:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_comment AS STRING) AS passenger_comment))
  FROM
    CHANGES(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the CHANGES TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE _CHANGE_TYPE = 'DELETE'
);

ייצוא נתונים לטבלת Bigtable

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומייצאת את הנתונים לטבלה ב-Bigtable בזמן אמת:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

ייצוא נתונים לטבלת Spanner

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ואז מייצאת את הנתונים לטבלה ב-Spanner בזמן אמת:

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count,
    _CHANGE_TIMESTAMP as change_timestamp
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

כתיבת נתונים לטבלה ב-BigQuery

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת ומשנה נתונים מטבלה ב-BigQuery שמקבלת מידע בסטרימינג על נסיעות במונית, ואז כותבת את הנתונים לטבלה אחרת ב-BigQuery בזמן אמת. כך הנתונים זמינים לניתוח נוסף בהמשך.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

עיבוד נתונים באמצעות מודל של Gemini Enterprise Agent Platform

בדוגמה הבאה מוצגת שאילתה רציפה שמשתמשת במודל של Agent Platform כדי ליצור מודעה לנוסעים במוניות על סמך קו הרוחב וקו האורך הנוכחיים שלהם, ואז מייצאת את התוצאות לנושא Pub/Sub בזמן אמת:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        result)) AS message
  FROM
    AI.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p))
      AS ml_output
);

ביצוע צבירות של JOINs וחלונות

בדוגמה הבאה מוצגת שאילתה מתמשכת שמבצעת צבירות של JOIN וחלונות.

נניח שאתם רוצים לצרף טבלה של נסיעות במונית לטבלה של בקשות למונית כדי להבין את מצב המוניות בכל שכונה כל חמש דקות. באמצעות פונקציות מצטברות, אפשר לתעד את נפח הביקוש למוניות בכל שכונה, ואת המרחק המינימלי, המקסימלי, הממוצע וסטיית התקן של הנוסע ממונית כשהוא ביקש נסיעה.

INSERT INTO
 `real_time_taxi_streaming.neighborhood_taxi_health`
WITH potential_matches AS (
 SELECT
   requests._CHANGE_TIMESTAMP AS bq_changed_ts,
   requests.geohash,
   requests.latitude,
   requests.longitude,
   ST_DISTANCE(
     ST_GEOGPOINT(requests.longitude, requests.latitude),
     ST_GEOGPOINT(taxis.longitude, taxis.latitude)
   ) AS distance_in_meters
 FROM
   APPENDS(TABLE `real_time_taxi_streaming.ride_requests`,
     CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS requests
 INNER JOIN
   APPENDS(TABLE `real_time_taxi_streaming.taxirides`,
     CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) AS taxis
 ON requests.geohash = taxis.geohash
 WHERE
   taxis.ride_status = 'available'
   AND taxis._CHANGE_TIMESTAMP BETWEEN (requests._CHANGE_TIMESTAMP - INTERVAL 5 MINUTE) AND requests._CHANGE_TIMESTAMP
   AND ST_Dwithin(
     ST_GEOGPOINT(requests.longitude, requests.latitude),
     ST_GEOGPOINT(taxis.longitude, taxis.latitude),
     2000 -- Distance in meters
   )
)
SELECT
 window_end,
 geohash,
 ROUND(AVG(latitude), 6) AS avg_latitude,
 ROUND(AVG(longitude), 6) AS avg_longitude,
 COUNT(*) AS taxi_demand_volume,
 ROUND(AVG(distance_in_meters), 2) AS avg_proximity_meters,
 ROUND(MIN(distance_in_meters), 2) AS min_proximity_meters,
 ROUND(MAX(distance_in_meters), 2) AS max_proximity_meters,
 ROUND(STDDEV(distance_in_meters), 2) AS proximity_stddev
FROM
 TUMBLE(TABLE potential_matches, "bq_changed_ts", INTERVAL 5 MINUTE)
GROUP BY
 window_end,
 geohash;

שינוי ה-SQL של שאילתה מתמשכת

אי אפשר לעדכן את ה-SQL שמשמש בשאילתה מתמשכת בזמן שהשאילתה המתמשכת פועלת. צריך לבטל את משימת השאילתה המתמשכת, לשנות את ה-SQL ואז להתחיל משימת שאילתה מתמשכת חדשה מהנקודה שבה הפסקתם את משימת השאילתה המתמשכת המקורית.

כדי לשנות את ה-SQL שמשמש בשאילתה מתמשכת:

  1. מציגים את פרטי העבודה של עבודת השאילתה המתמשכת שרוצים לעדכן, ורושמים את מזהה העבודה.
  2. אם אפשר, כדאי להשהות את איסוף הנתונים במעלה הזרם. אם לא ניתן לעשות זאת, יכול להיות שיהיה כפילות של חלק מהנתונים כשמפעילים מחדש את השאילתה המתמשכת.
  3. מבטלים את השאילתה המתמשכת שרוצים לשנות.
  4. כדי לקבל את הערך end_time של משימת השאילתה המתמשכת המקורית, משתמשים בתצוגה JOBS:INFORMATION_SCHEMA

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • REGION: האזור שבו נעשה שימוש בפרויקט.
    • JOB_ID: המזהה של משימת השאילתה המתמשכת שזיהיתם בשלב 1.
  5. משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי להתחיל את השאילתה המתמשכת מנקודת זמן מסוימת, באמצעות הערך end_time שאוחזר בשלב 5 כנקודת ההתחלה.

  6. משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי לשקף את השינויים הנדרשים.

  7. מריצים את השאילתה המתמשכת ששיניתם.

ביטול שאילתה מתמשכת

אפשר לבטל משימה של שאילתה מתמשכת בדיוק כמו כל משימה אחרת. יכול להיות שיחלפו עד דקה מרגע ביטול העבודה ועד שהשאילתה תפסיק לפעול.

אם מבטלים שאילתה ואז מפעילים אותה מחדש, השאילתה המופעלת מחדש מתנהגת כמו שאילתה חדשה ועצמאית. השאילתה שהופעלה מחדש לא מתחילה לעבד נתונים במקום שבו העבודה הקודמת נעצרה, ואי אפשר להפנות אותה לתוצאות של השאילתה הקודמת. התחלת שאילתה מתמשכת מנקודת זמן מסוימת

מעקב אחרי שאילתות וטיפול בשגיאות

יכול להיות ששאילתה מתמשכת תופסק בגלל גורמים כמו חוסר עקביות בנתונים, שינויים בסכימה, שיבושים זמניים בשירות או תחזוקה. למרות ש-BigQuery מטפל בחלק מהשגיאות הזמניות, הנה כמה שיטות מומלצות לשיפור העמידות של משימות:

המאמרים הבאים