יצירת שאילתות מתמשכות

במאמר הזה מוסבר איך להריץ שאילתה רציפה ב-BigQuery.

שאילתות מתמשכות ב-BigQuery הן הצהרות SQL שמופעלות באופן רציף. שאילתות רציפות מאפשרות לכם לנתח נתונים נכנסים ב-BigQuery בזמן אמת, ואז לייצא את התוצאות ל-Bigtable, ל-Pub/Sub או ל-Spanner, או לכתוב את התוצאות בטבלה ב-BigQuery.

בחירת סוג החשבון

אפשר ליצור ולהריץ משימת שאילתה מתמשכת באמצעות חשבון משתמש, או ליצור משימת שאילתה מתמשכת באמצעות חשבון משתמש ואז להריץ אותה באמצעות חשבון שירות. כדי להריץ שאילתה מתמשכת שמייצאת תוצאות לנושא Pub/Sub, צריך להשתמש בחשבון שירות.

כשמשתמשים בחשבון משתמש, שאילתה מתמשכת פועלת עד יומיים. כשמשתמשים בחשבון שירות, שאילתה מתמשכת פועלת עד 150 ימים. מידע נוסף זמין במאמר בנושא הרשאות.

ההרשאות הנדרשות

בקטע הזה מוסבר אילו הרשאות נדרשות כדי ליצור ולהריץ שאילתה מתמשכת. אפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית, כחלופה לתפקידים ב-IAM (הפלטפורמה לניהול זהויות והרשאות גישה) שצוינו.

הרשאות כשמשתמשים בחשבון משתמש

בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים כדי ליצור ולהריץ שאילתה מתמשכת באמצעות חשבון משתמש.

כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:

כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:

כדי לעדכן נתונים בטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת bigquery.tables.updateData ב-IAM. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:

אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של שימוש בשירות (roles/serviceusage.serviceUsageAdmin).

הרשאות כשמשתמשים בחשבון שירות

בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים לחשבון המשתמש שיוצר את השאילתה המתמשכת ולחשבון השירות שמריץ את השאילתה המתמשכת.

הרשאות בחשבון משתמש

כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:

כדי לשלוח עבודה שמופעלת באמצעות חשבון שירות, לחשבון המשתמש צריך להיות התפקיד משתמש בחשבון שירות (roles/iam.serviceAccountUser). אם אתם משתמשים באותו חשבון משתמש כדי ליצור את חשבון השירות, לחשבון המשתמש צריך להיות התפקיד אדמין של חשבון שירות (roles/iam.serviceAccountAdmin). במאמר הקצאת תפקיד יחיד מוסבר איך להגביל את הגישה של משתמש לחשבון שירות יחיד, ולא לכל חשבונות השירות בפרויקט.

אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של שימוש בשירות (roles/serviceusage.serviceUsageAdmin).

הרשאות של חשבון שירות

כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון השירות צריכה להיות הרשאת IAM‏ bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:

כדי לעדכן נתונים בטבלת BigQuery, לחשבון השירות צריכה להיות הרשאת ה-IAM‏ bigquery.tables.updateData. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:

לפני שמתחילים

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

יצירת בקשה לשמירת מקום

יוצרים הזמנה למהדורת Enterprise או Enterprise Plus, ואז יוצרים הקצאת הזמנה עם סוג משרה CONTINUOUS. אפשר להשתמש בבקשת ההזמנה הזו בהתאמה אוטומטית לעומס (automatic scaling) ובשיתוף משבצות זמן פנויות. יש מגבלות על הזמנות שחלות על הקצאות של הזמנות לשאילתות רציפות.

ייצוא ל-Pub/Sub

כדי לייצא נתונים ל-Pub/Sub, צריך להשתמש בממשקי API נוספים, בהרשאות IAM וב Google Cloud משאבים. מידע נוסף זמין במאמר בנושא ייצוא ל-Pub/Sub.

הטמעת מאפיינים מותאמים אישית כמטא-נתונים בהודעות Pub/Sub

אתם יכולים להשתמש במאפייני Pub/Sub כדי לספק מידע נוסף על ההודעה, כמו העדיפות, המקור, היעד או מטא-נתונים נוספים. אפשר גם להשתמש במאפיינים כדי לסנן הודעות במינוי.

בתוצאה של שאילתה מתמשכת, אם שם העמודה הוא _ATTRIBUTES, הערכים שלה מועתקים למאפיינים של הודעת Pub/Sub. השדות שצוינו ב-_ATTRIBUTES משמשים כמפתחות מאפיינים.

העמודה _ATTRIBUTES צריכה להיות מסוג JSON, בפורמט ARRAY<STRUCT<STRING, STRING>> או STRUCT<STRING>.

דוגמה אפשר לראות במאמר בנושא ייצוא נתונים לנושא Pub/Sub.

ייצוא אל Bigtable

כדי לייצא נתונים ל-Bigtable, צריך להשתמש בממשקי API נוספים, בהרשאות IAM ובמשאבים Google Cloud נוספים. מידע נוסף זמין במאמר בנושא ייצוא ל-Bigtable.

ייצוא ל-Spanner

כדי לייצא נתונים ל-Spanner, צריך ממשקי API נוספים, הרשאות IAM ומשאבים Google Cloud נוספים. מידע נוסף זמין במאמר בנושא ייצוא ל-Spanner (תהליך ETL הפוך).

כתיבת נתונים לטבלה ב-BigQuery

אפשר לכתוב נתונים לטבלת BigQuery באמצעות הצהרת INSERT.

שימוש בפונקציות AI

כדי להשתמש בפונקציית AI נתמכת ב<term ref="continuous query">שאילתה מתמשכת</term>, נדרשים ממשקי API נוספים, הרשאות IAM ומשאבים. Google Cloudלמידע נוסף, אפשר לעיין באחד מהנושאים הבאים, בהתאם לתרחיש השימוש:

כשמשתמשים בפונקציית AI בשאילתה מתמשכת, חשוב לבדוק אם הפלט של השאילתה יישאר במסגרת המכסה של הפונקציה. אם חורגים מהמכסה, יכול להיות שתצטרכו לטפל בנפרד ברשומות שלא יעברו עיבוד.

מציינים שעת התחלה לשאילתה מתמשכת

כדי לציין את הנתונים המוקדמים ביותר לעיבוד, צריך להשתמש בפונקציה APPENDS בסעיף FROM של שאילתה מתמשכת. לדוגמה, APPENDS(TABLE my_table, start_timestamp).

הארגומנט start_timestamp מגדיר את נקודת הזמן שבה השאילתה המתמשכת מתחילה לעבד נתונים. לדוגמה, APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) אומר ל-BigQuery לעבד נתונים שנוספו לטבלה my_table עד 10 דקות לפני תחילת השאילתה המתמשכת. נתונים שנוספים ל-my_table לאחר מכן מעובדים כשהם מתקבלים. לא מוטל עיכוב על עיבוד הנתונים.

כשמציינים את הארגומנט start_timestamp, הערך חייב להיות בטווח של חלון הזמן של הטבלה, שמוגדר כברירת מחדל לשבעה ימים בטבלאות רגילות. הגדרת start_timestamp ל-NULL תגרום לכך שברירת המחדל תהיה זמן היצירה של הטבלה. לא מומלץ להשתמש בערך NULL כי אם הטבלה נוצרה לפני חלון הזמן של האפשרות 'חזרה בזמן', המערכת תחזיר שגיאה. שאילתה באמצעות NULL עשויה להצליח בטבלה שנוצרה לאחרונה, אבל להיכשל בהמשך אם חותמת הזמן של יצירת הטבלה לא תהיה בטווח של שבעת הימים.

אל תציינו ארגומנט end_timestamp לפונקציה APPENDS כשמשתמשים בה בשאילתה מתמשכת.

בדוגמה הבאה מוצג איך להפעיל שאילתה מתמשכת מנקודת זמן מסוימת באמצעות הפונקציה APPENDS, כשמריצים שאילתה על טבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

הגדרה של נקודת התחלה מוקדמת יותר מחלון הזמן של הנסיעה

כדי לכלול נתונים שלא נכללים בחלון הזמן של שבעה ימים של 'מסע בזמן', צריך להשתמש בשאילתה רגילה כדי למלא את החוסרים בנתונים עד לנקודת זמן מסוימת, ואז להפעיל שאילתה מתמשכת מאותה נקודה. חותמת הזמן של ה-handover צריכה להיות בטווח של שבעה ימים, כדי שהשאילתה המתמשכת תוכל להמשיך מהמקום שבו השאילתה הרגילה הפסיקה.

בדוגמה הבאה מוצג איך למלא מחדש נתונים ישנים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ואז לעבור לשאילתה מתמשכת.

  1. מריצים שאילתה רגילה כדי למלא נתונים חסרים עד לנקודת זמן מסוימת:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this handoff point.
      -- This handoff timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. להריץ שאילתה מתמשכת מהנקודה בזמן שבה השאילתה נעצרה:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off + 1 microsecond.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

הרצת שאילתה מתמשכת באמצעות חשבון משתמש

בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון משתמש. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת על ידי חשבון משתמש פועלת למשך יומיים לכל היותר ואז נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי לבצע את התהליך הזה אוטומטית, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.

כדי להריץ שאילתה מתמשכת:

המסוף

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בעורך השאילתות, לוחצים על עוד.

    1. בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).
    2. לוחצים על אישור.
    3. אופציונלי: כדי לקבוע כמה זמן השאילתה תפעל, לוחצים על הגדרות השאילתה ומגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות השנייה.
  3. בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

  4. לוחצים על Run.

BQ

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. ב-Cloud Shell, מריצים את השאילתה המתמשכת באמצעות הפקודה bq query עם הדגל --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    מחליפים את QUERY בהצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. אפשר לקבוע כמה זמן השאילתה תפעל באמצעות הדגל --job_timeout_ms.

  3. API

    מריצים את השאילתה המתמשכת על ידי קריאה ל-method‏ jobs.insert. צריך להגדיר את השדה continuous לערך true ב-JobConfigurationQuery של משאב Job שמעבירים. אפשר גם להגדיר את השדה jobTimeoutMs כדי לקבוע כמה זמן השאילתה תפעל.

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

הרצת שאילתה מתמשכת באמצעות חשבון שירות

בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון שירות. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת באמצעות חשבון שירות יכולה לפעול עד 150 ימים ואז היא נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי להפוך את התהליך הזה לאוטומטי, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.

כדי להשתמש בחשבון שירות להרצת שאילתה מתמשכת:

המסוף

  1. יצירה של חשבון שירות.
  2. מקצים לחשבון השירות את ההרשאות הנדרשות.
  3. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  4. בעורך השאילתות, לוחצים על עוד.

  5. בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).

  6. לוחצים על אישור.

  7. בעורך השאילתות, לוחצים על More > Query settings.

  8. בקטע שאילתה מתמשכת, משתמשים בתיבה Service account כדי לבחור את חשבון השירות שיצרתם.

  9. אופציונלי: כדי לקבוע כמה זמן השאילתה תפעל, מגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות השנייה.

  10. לוחצים על Save.

  11. בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.

  12. לוחצים על Run.

BQ

  1. יצירה של חשבון שירות.
  2. מקצים לחשבון השירות את ההרשאות הנדרשות.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. בשורת הפקודה, מריצים את השאילתה המתמשכת באמצעות הפקודה bq query עם הדגלים הבאים:

    • מגדירים את הדגל --continuous לערך true כדי שהשאילתה תהיה רציפה.
    • משתמשים בדגל --connection_property כדי לציין חשבון שירות לשימוש.
    • אופציונלי: מגדירים את הדגל --job_timeout_ms כדי להגביל את זמן הריצה של השאילתה.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר לקבל את כתובת האימייל של חשבון השירות מהדף Service accounts במסוף Google Cloud .
    • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
  5. API

    1. יצירה של חשבון שירות.
    2. מקצים לחשבון השירות את ההרשאות הנדרשות.
    3. מריצים את השאילתה המתמשכת על ידי קריאה ל-method‏ jobs.insert. מגדירים את השדות הבאים במשאב JobConfigurationQuery של משאב Job שמעבירים:

      • מגדירים את השדה continuous לערך true כדי שהשאילתה תפעל ברציפות.
      • משתמשים בשדה connectionProperties כדי לציין חשבון שירות לשימוש.

      אפשר גם לשלוט באורך הזמן שבו השאילתה פועלת על ידי הגדרת השדה jobTimeoutMs במשאב JobConfiguration.

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      מחליפים את מה שכתוב בשדות הבאים:

      • PROJECT_ID: מזהה הפרויקט.
      • QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
      • SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר למצוא את כתובת האימייל של חשבון השירות בדף Service accounts במסוף Google Cloud .

יצירת מזהה משרה מותאם אישית

לכל עבודת שאילתה מוקצה מזהה עבודה שאפשר להשתמש בו כדי לחפש את העבודה ולנהל אותה. כברירת מחדל, מזהי המשימות נוצרים באופן אקראי. כדי להקל על החיפוש של מזהה המשימה של שאילתה מתמשכת באמצעות היסטוריית המשימות או כלי לבדיקת משימות, אפשר להקצות קידומת מותאמת אישית למזהה המשימה:

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בעורך השאילתות, לוחצים על עוד.

  3. בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).

  4. לוחצים על אישור.

  5. בעורך השאילתות, לוחצים על More > Query settings (עוד > הגדרות השאילתה).

  6. בקטע Custom job ID prefix (קידומת מותאמת אישית למזהה המשרה), מזינים קידומת מותאמת אישית לשם.

  7. לוחצים על Save.

דוגמאות

בדוגמאות הבאות של SQL מוצגים תרחישי שימוש נפוצים בשאילתות רציפות.

ייצוא נתונים לנושא Pub/Sub

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומפרסמת את הנתונים בנושא Pub/Sub בזמן אמת עם מאפייני הודעה:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

ייצוא נתונים לטבלת Bigtable

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומייצאת את הנתונים לטבלה ב-Bigtable בזמן אמת:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

ייצוא נתונים לטבלת Spanner

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ואז מייצאת את הנתונים לטבלת Spanner בזמן אמת:

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count,
    _CHANGE_TIMESTAMP as change_timestamp
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

כתיבת נתונים לטבלה ב-BigQuery

בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת ומשנה נתונים מטבלה ב-BigQuery שמקבלת מידע בסטרימינג על נסיעות במונית, ואז כותבת את הנתונים לטבלה אחרת ב-BigQuery בזמן אמת. כך הנתונים זמינים לניתוח נוסף בהמשך.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

עיבוד נתונים באמצעות מודל של Vertex AI

בדוגמה הבאה מוצגת שאילתה מתמשכת שמשתמשת במודל Vertex AI כדי ליצור מודעה לנוסעים במונית על סמך קו הרוחב וקו האורך הנוכחיים שלהם, ואז מייצאת את התוצאות לנושא Pub/Sub בזמן אמת:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        result)) AS message
  FROM
    AI.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p))
      AS ml_output
);

שינוי ה-SQL של שאילתה מתמשכת

אי אפשר לעדכן את ה-SQL שמשמש בשאילתה מתמשכת בזמן שהשאילתה המתמשכת פועלת. צריך לבטל את העבודה של השאילתה המתמשכת, לשנות את ה-SQL ואז להתחיל עבודה חדשה של שאילתה מתמשכת מהנקודה שבה הפסקתם את העבודה המקורית של השאילתה המתמשכת.

כדי לשנות את ה-SQL שמשמש בשאילתה מתמשכת:

  1. מציגים את פרטי העבודה של עבודת השאילתה המתמשכת שרוצים לעדכן, ורושמים את מזהה העבודה.
  2. אם אפשר, כדאי להשהות את איסוף הנתונים במעלה הזרם. אם לא תעשו את זה, יכול להיות שיהיה כפילות של חלק מהנתונים כשהשאילתה המתמשכת תופעל מחדש.
  3. מבטלים את השאילתה המתמשכת שרוצים לשנות.
  4. כדי לקבל את הערך end_time של משימת השאילתה המתמשכת המקורית, משתמשים בתצוגה INFORMATION_SCHEMA JOBS:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • REGION: האזור שבו נעשה שימוש בפרויקט.
    • JOB_ID: מזהה משימה של שאילתה מתמשכת שזיהיתם בשלב 1.
  5. משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי להתחיל את השאילתה המתמשכת מנקודת זמן מסוימת, באמצעות הערך end_time שאוחזר בשלב 5 כנקודת ההתחלה.

  6. משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי לשקף את השינויים הנדרשים.

  7. מריצים את השאילתה המתמשכת ששיניתם.

ביטול שאילתה מתמשכת

אפשר לבטל משימה של שאילתה מתמשכת בדיוק כמו כל משימה אחרת. יכול להיות שיחלפו עד דקה מרגע ביטול העבודה ועד שהשאילתה תפסיק לפעול.

אם מבטלים שאילתה ואז מפעילים אותה מחדש, השאילתה המופעלת מחדש מתנהגת כמו שאילתה חדשה ועצמאית. השאילתה שהופעלה מחדש לא מתחילה לעבד נתונים במקום שבו העבודה הקודמת נעצרה, ואי אפשר להפנות אותה לתוצאות של השאילתה הקודמת. איך מתחילים שאילתה מתמשכת מנקודת זמן מסוימת

מעקב אחרי שאילתות וטיפול בשגיאות

יכול להיות ששאילתה מתמשכת תופסק בגלל גורמים כמו חוסר עקביות בנתונים, שינויים בסכימה, שיבושים זמניים בשירות או תחזוקה. למרות שמערכת BigQuery מטפלת בחלק מהשגיאות הזמניות, הנה כמה שיטות מומלצות לשיפור העמידות של משימות:

המאמרים הבאים