יצירת שאילתות מתמשכות
במאמר הזה מוסבר איך להריץ שאילתה רציפה ב-BigQuery.
שאילתות מתמשכות ב-BigQuery הן הצהרות SQL שמופעלות באופן רציף. שאילתות רציפות מאפשרות לכם לנתח נתונים נכנסים ב-BigQuery בזמן אמת, ואז לייצא את התוצאות ל-Bigtable, ל-Pub/Sub או ל-Spanner, או לכתוב את התוצאות בטבלה ב-BigQuery.
בחירת סוג החשבון
אפשר ליצור ולהריץ משימת שאילתה מתמשכת באמצעות חשבון משתמש, או ליצור משימת שאילתה מתמשכת באמצעות חשבון משתמש ואז להריץ אותה באמצעות חשבון שירות. כדי להריץ שאילתה מתמשכת שמייצאת תוצאות לנושא Pub/Sub, צריך להשתמש בחשבון שירות.
כשמשתמשים בחשבון משתמש, שאילתה מתמשכת פועלת עד יומיים. כשמשתמשים בחשבון שירות, שאילתה מתמשכת פועלת עד 150 ימים. מידע נוסף זמין במאמר בנושא הרשאות.
ההרשאות הנדרשות
בקטע הזה מוסבר אילו הרשאות נדרשות כדי ליצור ולהריץ שאילתה מתמשכת. אפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית, כחלופה לתפקידים ב-IAM (הפלטפורמה לניהול זהויות והרשאות גישה) שצוינו.
הרשאות כשמשתמשים בחשבון משתמש
בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים כדי ליצור ולהריץ שאילתה מתמשכת באמצעות חשבון משתמש.
כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:
- BigQuery User (
roles/bigquery.user) - BigQuery Job User (
roles/bigquery.jobUser) - BigQuery Admin (
roles/bigquery.admin)
כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:
- BigQuery Data Viewer (
roles/bigquery.dataViewer) - עריכה של נתוני BigQuery (
roles/bigquery.dataEditor) - בעלים של נתוני BigQuery (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
כדי לעדכן נתונים בטבלה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת bigquery.tables.updateData ב-IAM. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:
- עריכה של נתוני BigQuery (
roles/bigquery.dataEditor) - בעלים של נתוני BigQuery (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של שימוש בשירות (roles/serviceusage.serviceUsageAdmin).
הרשאות כשמשתמשים בחשבון שירות
בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים לחשבון המשתמש שיוצר את השאילתה המתמשכת ולחשבון השירות שמריץ את השאילתה המתמשכת.
הרשאות בחשבון משתמש
כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:
- BigQuery User (
roles/bigquery.user) - BigQuery Job User (
roles/bigquery.jobUser) - BigQuery Admin (
roles/bigquery.admin)
כדי לשלוח עבודה שמופעלת באמצעות חשבון שירות, לחשבון המשתמש צריך להיות התפקיד משתמש בחשבון שירות (roles/iam.serviceAccountUser). אם אתם משתמשים באותו חשבון משתמש כדי ליצור את חשבון השירות, לחשבון המשתמש צריך להיות התפקיד אדמין של חשבון שירות (roles/iam.serviceAccountAdmin). במאמר הקצאת תפקיד יחיד מוסבר איך להגביל את הגישה של משתמש לחשבון שירות יחיד, ולא לכל חשבונות השירות בפרויקט.
אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של שימוש בשירות (roles/serviceusage.serviceUsageAdmin).
הרשאות של חשבון שירות
כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון השירות צריכה להיות הרשאת IAM bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:
- BigQuery Data Viewer (
roles/bigquery.dataViewer) - עריכה של נתוני BigQuery (
roles/bigquery.dataEditor) - בעלים של נתוני BigQuery (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
bigquery.tables.updateData. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.updateData:
- עריכה של נתוני BigQuery (
roles/bigquery.dataEditor) - בעלים של נתוני BigQuery (
roles/bigquery.dataOwner) - BigQuery Admin (
roles/bigquery.admin)
לפני שמתחילים
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
יצירת בקשה לשמירת מקום
יוצרים הזמנה למהדורת Enterprise או Enterprise Plus,
ואז יוצרים הקצאת הזמנה
עם סוג משרה CONTINUOUS. אפשר להשתמש בבקשת ההזמנה הזו בהתאמה אוטומטית לעומס (automatic scaling) ובשיתוף משבצות זמן פנויות.
יש מגבלות על הזמנות שחלות על הקצאות של הזמנות לשאילתות רציפות.
ייצוא ל-Pub/Sub
כדי לייצא נתונים ל-Pub/Sub, צריך להשתמש בממשקי API נוספים, בהרשאות IAM וב Google Cloud משאבים. מידע נוסף זמין במאמר בנושא ייצוא ל-Pub/Sub.
הטמעת מאפיינים מותאמים אישית כמטא-נתונים בהודעות Pub/Sub
אתם יכולים להשתמש במאפייני Pub/Sub כדי לספק מידע נוסף על ההודעה, כמו העדיפות, המקור, היעד או מטא-נתונים נוספים. אפשר גם להשתמש במאפיינים כדי לסנן הודעות במינוי.
בתוצאה של שאילתה מתמשכת, אם שם העמודה הוא _ATTRIBUTES, הערכים שלה מועתקים למאפיינים של הודעת Pub/Sub.
השדות שצוינו ב-_ATTRIBUTES משמשים כמפתחות מאפיינים.
העמודה _ATTRIBUTES צריכה להיות מסוג JSON, בפורמט ARRAY<STRUCT<STRING, STRING>> או STRUCT<STRING>.
דוגמה אפשר לראות במאמר בנושא ייצוא נתונים לנושא Pub/Sub.
ייצוא אל Bigtable
כדי לייצא נתונים ל-Bigtable, צריך להשתמש בממשקי API נוספים, בהרשאות IAM ובמשאבים Google Cloud נוספים. מידע נוסף זמין במאמר בנושא ייצוא ל-Bigtable.
ייצוא ל-Spanner
כדי לייצא נתונים ל-Spanner, צריך ממשקי API נוספים, הרשאות IAM ומשאבים Google Cloud נוספים. מידע נוסף זמין במאמר בנושא ייצוא ל-Spanner (תהליך ETL הפוך).
כתיבת נתונים לטבלה ב-BigQuery
אפשר לכתוב נתונים לטבלת BigQuery באמצעות הצהרת INSERT.
שימוש בפונקציות AI
כדי להשתמש בפונקציית AI נתמכת ב<term ref="continuous query">שאילתה מתמשכת</term>, נדרשים ממשקי API נוספים, הרשאות IAM ומשאבים. Google Cloudלמידע נוסף, אפשר לעיין באחד מהנושאים הבאים, בהתאם לתרחיש השימוש:
- יצירת טקסט באמצעות הפונקציה
AI.GENERATE_TEXT - יצירת הטמעות טקסט באמצעות הפונקציה
AI.GENERATE_EMBEDDING - הבנת טקסט באמצעות הפונקציה
ML.UNDERSTAND_TEXT - תרגום טקסט באמצעות הפונקציה
ML.TRANSLATE
כשמשתמשים בפונקציית AI בשאילתה מתמשכת, חשוב לבדוק אם הפלט של השאילתה יישאר במסגרת המכסה של הפונקציה. אם חורגים מהמכסה, יכול להיות שתצטרכו לטפל בנפרד ברשומות שלא יעברו עיבוד.
מציינים שעת התחלה לשאילתה מתמשכת
כדי לציין את הנתונים המוקדמים ביותר לעיבוד, צריך להשתמש בפונקציה APPENDS בסעיף FROM של שאילתה מתמשכת. לדוגמה, APPENDS(TABLE my_table, start_timestamp).
הארגומנט start_timestamp מגדיר את נקודת הזמן שבה השאילתה המתמשכת מתחילה לעבד נתונים. לדוגמה, APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) אומר ל-BigQuery לעבד נתונים שנוספו לטבלה my_table עד 10 דקות לפני תחילת השאילתה המתמשכת.
נתונים שנוספים ל-my_table לאחר מכן מעובדים כשהם מתקבלים. לא מוטל עיכוב על עיבוד הנתונים.
כשמציינים את הארגומנט start_timestamp, הערך חייב להיות בטווח של חלון הזמן של הטבלה, שמוגדר כברירת מחדל לשבעה ימים בטבלאות רגילות. הגדרת start_timestamp ל-NULL תגרום לכך שברירת המחדל תהיה זמן היצירה של הטבלה. לא מומלץ להשתמש בערך NULL כי אם הטבלה נוצרה לפני חלון הזמן של האפשרות 'חזרה בזמן', המערכת תחזיר שגיאה. שאילתה באמצעות NULL עשויה להצליח בטבלה שנוצרה לאחרונה, אבל להיכשל בהמשך אם חותמת הזמן של יצירת הטבלה לא תהיה בטווח של שבעת הימים.
אל תציינו ארגומנט end_timestamp לפונקציה APPENDS כשמשתמשים בה בשאילתה מתמשכת.
בדוגמה הבאה מוצג איך להפעיל שאילתה מתמשכת מנקודת זמן מסוימת באמצעות הפונקציה APPENDS, כשמריצים שאילתה על טבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
הגדרה של נקודת התחלה מוקדמת יותר מחלון הזמן של הנסיעה
כדי לכלול נתונים שלא נכללים בחלון הזמן של שבעה ימים של 'מסע בזמן', צריך להשתמש בשאילתה רגילה כדי למלא את החוסרים בנתונים עד לנקודת זמן מסוימת, ואז להפעיל שאילתה מתמשכת מאותה נקודה. חותמת הזמן של ה-handover צריכה להיות בטווח של שבעה ימים, כדי שהשאילתה המתמשכת תוכל להמשיך מהמקום שבו השאילתה הרגילה הפסיקה.
בדוגמה הבאה מוצג איך למלא מחדש נתונים ישנים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ואז לעבור לשאילתה מתמשכת.
מריצים שאילתה רגילה כדי למלא נתונים חסרים עד לנקודת זמן מסוימת:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this handoff point. -- This handoff timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
להריץ שאילתה מתמשכת מהנקודה בזמן שבה השאילתה נעצרה:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off + 1 microsecond. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
הרצת שאילתה מתמשכת באמצעות חשבון משתמש
בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון משתמש. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת על ידי חשבון משתמש פועלת למשך יומיים לכל היותר ואז נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי לבצע את התהליך הזה אוטומטית, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.
כדי להריץ שאילתה מתמשכת:
המסוף
במסוף Google Cloud , עוברים לדף BigQuery.
בעורך השאילתות, לוחצים על עוד.
- בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).
- לוחצים על אישור.
- אופציונלי: כדי לקבוע כמה זמן השאילתה תפעל, לוחצים על הגדרות השאילתה ומגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות השנייה.
בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
לוחצים על Run.
BQ
-
In the Google Cloud console, activate Cloud Shell.
ב-Cloud Shell, מריצים את השאילתה המתמשכת באמצעות הפקודה
bq queryעם הדגל--continuous:bq query --use_legacy_sql=false --continuous=true 'QUERY'
מחליפים את
QUERYבהצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. אפשר לקבוע כמה זמן השאילתה תפעל באמצעות הדגל--job_timeout_ms.PROJECT_ID: מזהה הפרויקט.-
QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
API
מריצים את השאילתה המתמשכת על ידי קריאה ל-method jobs.insert.
צריך להגדיר את השדה continuous לערך true ב-JobConfigurationQuery של משאב Job שמעבירים.
אפשר גם להגדיר את השדה jobTimeoutMs כדי לקבוע כמה זמן השאילתה תפעל.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
מחליפים את מה שכתוב בשדות הבאים:
הרצת שאילתה מתמשכת באמצעות חשבון שירות
בקטע הזה מוסבר איך להריץ שאילתה מתמשכת באמצעות חשבון שירות. אחרי שהשאילתה המתמשכת פועלת, אפשר לסגור את Google Cloud המסוף, את חלון הטרמינל או את האפליקציה בלי להפריע להרצת השאילתה. שאילתה מתמשכת שמופעלת באמצעות חשבון שירות יכולה לפעול עד 150 ימים ואז היא נעצרת באופן אוטומטי. כדי להמשיך לעבד נתונים חדשים שמגיעים, צריך להתחיל שאילתה מתמשכת חדשה ולהגדיר נקודת התחלה. כדי להפוך את התהליך הזה לאוטומטי, אפשר לעיין במאמר בנושא ניסיון חוזר של שאילתות שנכשלו.
כדי להשתמש בחשבון שירות להרצת שאילתה מתמשכת:
המסוף
- יצירה של חשבון שירות.
- מקצים לחשבון השירות את ההרשאות הנדרשות.
במסוף Google Cloud , עוברים לדף BigQuery.
בעורך השאילתות, לוחצים על עוד.
בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).
לוחצים על אישור.
בעורך השאילתות, לוחצים על More > Query settings.
בקטע שאילתה מתמשכת, משתמשים בתיבה Service account כדי לבחור את חשבון השירות שיצרתם.
אופציונלי: כדי לקבוע כמה זמן השאילתה תפעל, מגדירים את הזמן הקצוב לתפוגה של העבודה באלפיות השנייה.
לוחצים על Save.
בעורך השאילתות, מקלידים את הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
לוחצים על Run.
BQ
- יצירה של חשבון שירות.
- מקצים לחשבון השירות את ההרשאות הנדרשות.
-
In the Google Cloud console, activate Cloud Shell.
בשורת הפקודה, מריצים את השאילתה המתמשכת באמצעות הפקודה
bq queryעם הדגלים הבאים:- מגדירים את הדגל
--continuousלערךtrueכדי שהשאילתה תהיה רציפה. - משתמשים בדגל
--connection_propertyכדי לציין חשבון שירות לשימוש. - אופציונלי: מגדירים את הדגל
--job_timeout_msכדי להגביל את זמן הריצה של השאילתה.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
מחליפים את מה שכתוב בשדות הבאים:
PROJECT_ID: מזהה הפרויקט.-
SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר לקבל את כתובת האימייל של חשבון השירות מהדף Service accounts במסוף Google Cloud . -
QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות.
- מגדירים את הדגל
- יצירה של חשבון שירות.
- מקצים לחשבון השירות את ההרשאות הנדרשות.
מריצים את השאילתה המתמשכת על ידי קריאה ל-method
jobs.insert. מגדירים את השדות הבאים במשאבJobConfigurationQueryשל משאבJobשמעבירים:- מגדירים את השדה
continuousלערךtrueכדי שהשאילתה תפעל ברציפות. - משתמשים בשדה
connectionPropertiesכדי לציין חשבון שירות לשימוש.
אפשר גם לשלוט באורך הזמן שבו השאילתה פועלת על ידי הגדרת השדה
jobTimeoutMsבמשאבJobConfiguration.curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
מחליפים את מה שכתוב בשדות הבאים:
PROJECT_ID: מזהה הפרויקט.-
QUERY: הצהרת ה-SQL של השאילתה המתמשכת. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. -
SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר למצוא את כתובת האימייל של חשבון השירות בדף Service accounts במסוף Google Cloud .
- מגדירים את השדה
API
יצירת מזהה משרה מותאם אישית
לכל עבודת שאילתה מוקצה מזהה עבודה שאפשר להשתמש בו כדי לחפש את העבודה ולנהל אותה. כברירת מחדל, מזהי המשימות נוצרים באופן אקראי. כדי להקל על החיפוש של מזהה המשימה של שאילתה מתמשכת באמצעות היסטוריית המשימות או כלי לבדיקת משימות, אפשר להקצות קידומת מותאמת אישית למזהה המשימה:
במסוף Google Cloud , עוברים לדף BigQuery.
בעורך השאילתות, לוחצים על עוד.
בקטע Choose query mode (בחירת מצב שאילתה), בוחרים באפשרות Continuous query (שאילתה מתמשכת).
לוחצים על אישור.
בעורך השאילתות, לוחצים על More > Query settings (עוד > הגדרות השאילתה).
בקטע Custom job ID prefix (קידומת מותאמת אישית למזהה המשרה), מזינים קידומת מותאמת אישית לשם.
לוחצים על Save.
דוגמאות
בדוגמאות הבאות של SQL מוצגים תרחישי שימוש נפוצים בשאילתות רציפות.
ייצוא נתונים לנושא Pub/Sub
בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומפרסמת את הנתונים בנושא Pub/Sub בזמן אמת עם מאפייני הודעה:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
ייצוא נתונים לטבלת Bigtable
בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ומייצאת את הנתונים לטבלה ב-Bigtable בזמן אמת:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
ייצוא נתונים לטבלת Spanner
בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת נתונים מטבלה ב-BigQuery שמקבלת מידע על נסיעות במונית בסטרימינג, ואז מייצאת את הנתונים לטבלת Spanner בזמן אמת:
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count, _CHANGE_TIMESTAMP as change_timestamp FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
כתיבת נתונים לטבלה ב-BigQuery
בדוגמה הבאה מוצגת שאילתה מתמשכת שמסננת ומשנה נתונים מטבלה ב-BigQuery שמקבלת מידע בסטרימינג על נסיעות במונית, ואז כותבת את הנתונים לטבלה אחרת ב-BigQuery בזמן אמת. כך הנתונים זמינים לניתוח נוסף בהמשך.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
עיבוד נתונים באמצעות מודל של Vertex AI
בדוגמה הבאה מוצגת שאילתה מתמשכת שמשתמשת במודל Vertex AI כדי ליצור מודעה לנוסעים במונית על סמך קו הרוחב וקו האורך הנוכחיים שלהם, ואז מייצאת את התוצאות לנושא Pub/Sub בזמן אמת:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, result)) AS message FROM AI.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p)) AS ml_output );
שינוי ה-SQL של שאילתה מתמשכת
אי אפשר לעדכן את ה-SQL שמשמש בשאילתה מתמשכת בזמן שהשאילתה המתמשכת פועלת. צריך לבטל את העבודה של השאילתה המתמשכת, לשנות את ה-SQL ואז להתחיל עבודה חדשה של שאילתה מתמשכת מהנקודה שבה הפסקתם את העבודה המקורית של השאילתה המתמשכת.
כדי לשנות את ה-SQL שמשמש בשאילתה מתמשכת:
- מציגים את פרטי העבודה של עבודת השאילתה המתמשכת שרוצים לעדכן, ורושמים את מזהה העבודה.
- אם אפשר, כדאי להשהות את איסוף הנתונים במעלה הזרם. אם לא תעשו את זה, יכול להיות שיהיה כפילות של חלק מהנתונים כשהשאילתה המתמשכת תופעל מחדש.
- מבטלים את השאילתה המתמשכת שרוצים לשנות.
כדי לקבל את הערך
end_timeשל משימת השאילתה המתמשכת המקורית, משתמשים בתצוגהINFORMATION_SCHEMAJOBS:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
מחליפים את מה שכתוב בשדות הבאים:
PROJECT_ID: מזהה הפרויקט.-
REGION: האזור שבו נעשה שימוש בפרויקט. -
JOB_ID: מזהה משימה של שאילתה מתמשכת שזיהיתם בשלב 1.
משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי להתחיל את השאילתה המתמשכת מנקודת זמן מסוימת, באמצעות הערך
end_timeשאוחזר בשלב 5 כנקודת ההתחלה.משנים את הצהרת ה-SQL של השאילתה המתמשכת כדי לשקף את השינויים הנדרשים.
מריצים את השאילתה המתמשכת ששיניתם.
ביטול שאילתה מתמשכת
אפשר לבטל משימה של שאילתה מתמשכת בדיוק כמו כל משימה אחרת. יכול להיות שיחלפו עד דקה מרגע ביטול העבודה ועד שהשאילתה תפסיק לפעול.
אם מבטלים שאילתה ואז מפעילים אותה מחדש, השאילתה המופעלת מחדש מתנהגת כמו שאילתה חדשה ועצמאית. השאילתה שהופעלה מחדש לא מתחילה לעבד נתונים במקום שבו העבודה הקודמת נעצרה, ואי אפשר להפנות אותה לתוצאות של השאילתה הקודמת. איך מתחילים שאילתה מתמשכת מנקודת זמן מסוימת
מעקב אחרי שאילתות וטיפול בשגיאות
יכול להיות ששאילתה מתמשכת תופסק בגלל גורמים כמו חוסר עקביות בנתונים, שינויים בסכימה, שיבושים זמניים בשירות או תחזוקה. למרות שמערכת BigQuery מטפלת בחלק מהשגיאות הזמניות, הנה כמה שיטות מומלצות לשיפור העמידות של משימות: