ייצוא נתונים ל-Pub/Sub (תהליך ETL הפוך)

כדי לייצא נתונים ל-Pub/Sub, צריך להשתמש בשאילתות רציפות של BigQuery.

במאמר הזה מוסבר איך להגדיר תהליך הפוך של שליפה, טרנספורמציה וטעינה (RETL) מ-BigQuery אל Pub/Sub. כדי לעשות את זה, אפשר להשתמש בהצהרת EXPORT DATA בשאילתה מתמשכת כדי לייצא נתונים מ-BigQuery אל נושא ב-Pub/Sub.

אתם יכולים להשתמש בתהליך עבודה של RETL ב-Pub/Sub כדי לשלב את יכולות הניתוח של BigQuery עם שירות ההודעות הגלובלי האסינכרוני והניתן להרחבה של Pub/Sub. תהליך העבודה הזה מאפשר לכם להעביר נתונים לאפליקציות ולשירותים במורד הזרם באופן מבוסס-אירועים.

דרישות מוקדמות

אתם צריכים ליצור חשבון שירות. כדי להריץ שאילתה מתמשכת שמייצאת תוצאות לנושא Pub/Sub, צריך חשבון שירות.

צריך ליצור נושא ב-Pub/Sub כדי לקבל את התוצאות של השאילתה הרציפה כהודעות, ומינוי ל-Pub/Sub שאפליקציית היעד יכולה להשתמש בו כדי לקבל את ההודעות האלה.

התפקידים הנדרשים

בקטע הזה מוסבר על התפקידים וההרשאות שנדרשים לחשבון המשתמש שיוצר את השאילתה המתמשכת ולחשבון השירות שמריץ את השאילתה המתמשכת.

הרשאות בחשבון משתמש

כדי ליצור משימה ב-BigQuery, לחשבון המשתמש צריכה להיות הרשאת IAM‏ bigquery.jobs.create. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.jobs.create:

כדי לשלוח עבודה שמופעלת באמצעות חשבון שירות, לחשבון המשתמש צריך להיות התפקיד משתמש בחשבון שירות (roles/iam.serviceAccountUser). אם אתם משתמשים באותו חשבון משתמש כדי ליצור את חשבון השירות, לחשבון המשתמש צריך להיות התפקיד אדמין של חשבון שירות (roles/iam.serviceAccountAdmin). במאמר הקצאת תפקיד יחיד מוסבר איך להגביל את הגישה של משתמש לחשבון שירות יחיד, ולא לכל חשבונות השירות בפרויקט.

אם חשבון המשתמש צריך להפעיל את ממשקי ה-API שנדרשים לתרחיש השימוש שלכם בשאילתה מתמשכת, לחשבון המשתמש צריך להיות התפקיד אדמין של שימוש בשירות (roles/serviceusage.serviceUsageAdmin).

הרשאות של חשבון שירות

כדי לייצא נתונים מטבלה ב-BigQuery, לחשבון השירות צריכה להיות הרשאת IAM‏ bigquery.tables.export. כל אחד מהתפקידים הבאים ב-IAM מעניק את ההרשאה bigquery.tables.export:

כדי שלחשבון השירות תהיה גישה ל-Pub/Sub, צריך להקצות לחשבון השירות את שני תפקידי ה-IAM הבאים:

יכול להיות שאפשר לקבל את ההרשאות הנדרשות גם באמצעות תפקידים בהתאמה אישית.

לפני שמתחילים

Enable the BigQuery and Pub/Sub APIs.

Roles required to enable APIs

To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

Enable the APIs

ייצוא ל-Pub/Sub

כדי לייצא נתונים לנושא Pub/Sub, משתמשים בהצהרה EXPORT DATA:

המסוף

  1. במסוף Google Cloud , עוברים לדף BigQuery.

    כניסה ל-BigQuery

  2. בעורך השאילתות, לוחצים על More > Query settings.

  3. בקטע Continuous Query (שאילתה מתמשכת), מסמנים את התיבה Use continuous query mode (שימוש במצב שאילתה מתמשכת).

  4. בתיבה Service account, בוחרים את חשבון השירות שיצרתם.

  5. לוחצים על Save.

  6. מזינים את ההצהרה הבאה בעורך השאילתות:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • TOPIC_ID: מזהה נושא ה-Pub/Sub. אפשר לקבל את מזהה הנושא מהדף Topics במסוף Google Cloud .
    • QUERY: הצהרת ה-SQL לבחירת הנתונים לייצוא. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. כדי לציין את נקודת הזמן שבה יתחיל עיבוד הנתונים, צריך להשתמש בפונקציה APPENDS בסעיף FROM של שאילתה מתמשכת.
  7. לוחצים על Run.

BQ

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. בשורת הפקודה, מריצים את השאילתה המתמשכת באמצעות הפקודה bq query עם הדגלים הבאים:

    • מגדירים את הדגל --continuous לערך true כדי שהשאילתה תהיה רציפה.
    • משתמשים בדגל --connection_property כדי לציין חשבון שירות לשימוש.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט.
    • SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר למצוא את כתובת האימייל של חשבון השירות בדף Service accounts במסוף Google Cloud .
    • QUERY: הצהרת ה-SQL לבחירת הנתונים לייצוא. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. כדי לציין את נקודת הזמן שבה יתחיל עיבוד הנתונים, צריך להשתמש בפונקציה APPENDS בסעיף FROM של שאילתה מתמשכת.
  3. API

    1. מריצים את השאילתה המתמשכת על ידי קריאה ל-method‏ jobs.insert. מגדירים את השדות הבאים במשאב JobConfigurationQuery של משאב Job שמעבירים:

      • מגדירים את השדה continuous לערך true כדי שהשאילתה תפעל ברציפות.
      • משתמשים בשדה connection_property כדי לציין חשבון שירות לשימוש.
      curl --request POST \
        'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
        --header 'Authorization: Bearer $(gcloud auth print-access-token) \
        --header 'Accept: application/json' \
        --header 'Content-Type: application/json' \
        --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
        --compressed

      מחליפים את מה שכתוב בשדות הבאים:

      • PROJECT_ID: מזהה הפרויקט.
      • QUERY: הצהרת ה-SQL לבחירת הנתונים לייצוא. הצהרת ה-SQL חייבת להכיל רק פעולות נתמכות. כדי לציין את נקודת הזמן שבה יתחיל עיבוד הנתונים, צריך להשתמש בפונקציה APPENDS בסעיף FROM של שאילתה מתמשכת.
      • SERVICE_ACCOUNT_EMAIL: כתובת האימייל של חשבון השירות. אפשר למצוא את כתובת האימייל של חשבון השירות בדף Service accounts במסוף Google Cloud .

ייצוא של כמה עמודות ל-Pub/Sub

אם רוצים לכלול כמה עמודות בפלט, אפשר ליצור עמודת struct שתכיל את ערכי העמודות, ואז להמיר את ערך ה-struct למחרוזת JSON באמצעות הפונקציה TO_JSON_STRING. בדוגמה הבאה מיוצאים נתונים מארבע עמודות, בפורמט של מחרוזת JSON:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

אופטימיזציה של ייצוא

אם נראה שהביצועים של משימת השאילתה המתמשכת מוגבלים בגלל משאבי מחשוב זמינים, כדאי לנסות להגדיל את גודל הקצאת יחידות הקיבולת (slots) של BigQueryCONTINUOUS.

מגבלות

  • הנתונים המיוצאים צריכים לכלול עמודה אחת של STRING או של BYTES. שם העמודה יכול להיות כל שם שתבחרו.
  • כדי לייצא ל-Pub/Sub, צריך להשתמש בשאילתה מתמשכת.
  • אי אפשר להעביר סכימה לנושא Pub/Sub בשאילתה מתמשכת.
  • אי אפשר לייצא נתונים לנושא Pub/Sub שמשתמש בסכימה.
  • כשמייצאים ל-Pub/Sub, אפשר לייצא רשומות בפורמט JSON שחלק מהערכים שלהן הם NULL, אבל אי אפשר לייצא רשומות שמורכבות רק מערכים של NULL. אפשר להחריג רשומות NULL מתוצאות השאילתה על ידי הוספת מסנן WHERE message IS NOT NULL לשאילתה מתמשכת.
  • כשמייצאים נתונים לנושא ב-Pub/Sub שהוגדר עם נקודת קצה אזורית, צריך להגדיר את נקודת הקצה באותו גבול אזורי של מערך הנתונים ב-BigQuery שמכיל את הטבלה ששולחים לה שאילתה. Google Cloud
  • הנתונים המיוצאים לא יכולים לחרוג מהמכסות של Pub/Sub.

תמחור

כשמייצאים נתונים בשאילתה מתמשכת, החיוב מתבצע לפי תמחור קיבולת החישוב של BigQuery. כדי להריץ שאילתות רציפות, צריך הזמנה שמשתמשת במהדורות Enterprise או Enterprise Plus, והקצאת הזמנה שמשתמשת בסוג המשימה CONTINUOUS.

אחרי ייצוא הנתונים, תחויבו על השימוש ב-Pub/Sub. מידע נוסף זמין במאמר בנושא תמחור של Pub/Sub.