Pub/Sub to BigQuery with Python UDF template

התבנית Pub/Sub to BigQuery with Python UDF היא צינור סטרימינג שקורא הודעות בפורמט JSON מ-Pub/Sub וכותב אותן לטבלה ב-BigQuery. אפשר גם לספק פונקציה בהגדרת המשתמש (UDF) שנכתבה ב-Python כדי לעבד את ההודעות הנכנסות.

הדרישות לגבי צינורות עיבוד נתונים

  • הטבלה ב-BigQuery צריכה להיות קיימת ולהיות לה סכימה.
  • נתוני ההודעה ב-Pub/Sub צריכים להיות בפורמט JSON, או שצריך לספק UDF שממיר את נתוני ההודעה ל-JSON. נתוני ה-JSON צריכים להתאים לסכימת הטבלה ב-BigQuery. לדוגמה, אם מטען ה-JSON (payload) מעוצב כ-{"k1":"v1", "k2":"v2"}, בטבלת BigQuery צריכות להיות שתי עמודות מחרוזת בשמות k1 ו-k2.
  • מציינים את הפרמטר inputSubscription או inputTopic, אבל לא את שניהם.

פרמטרים של תבניות

פרמטרים נדרשים

  • outputTableSpec: הטבלה ב-BigQuery שאליה ייכתבו הנתונים, בפורמט PROJECT_ID:DATASET_NAME.TABLE_NAME.

פרמטרים אופציונליים

  • inputTopic: נושא Pub/Sub לקריאה, בפורמט projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • inputSubscription: המינוי ל-Pub/Sub שממנו קוראים, בפורמט projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>.
  • outputDeadletterTable: הטבלה ב-BigQuery שבה יישמרו הודעות שלא הצליחו להגיע לטבלת הפלט, בפורמט PROJECT_ID:DATASET_NAME.TABLE_NAME. אם הטבלה לא קיימת, היא נוצרת כשהצינור מופעל. אם לא מציינים את הפרמטר הזה, המערכת משתמשת בערך OUTPUT_TABLE_SPEC_error_records.
  • useStorageWriteApiAtLeastOnce: כשמשתמשים ב-Storage Write API, המאפיין הזה מציין את סמנטיקת הכתיבה. כדי להשתמש בסמנטיקה של 'לפחות פעם אחת' (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), צריך להגדיר את הפרמטר הזה לערך true. כדי להשתמש בסמנטיקה של 'פעם אחת בדיוק', מגדירים את הפרמטר לערך false. הפרמטר הזה רלוונטי רק אם הערך של useStorageWriteApi הוא true. ערך ברירת המחדל הוא false.
  • useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API‏ (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API‏ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0.
  • storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.
  • pythonExternalTextTransformGcsPath: תבנית הנתיב ב-Cloud Storage לקוד Python שמכיל את הפונקציות המוגדרות על ידי המשתמש. לדוגמה, gs://your-bucket/your-function.py.
  • pythonExternalTextTransformFunctionName: שם הפונקציה לקריאה מקובץ Python. אפשר להשתמש רק באותיות, ספרות וקווים תחתונים. לדוגמה, 'transform' or 'transform_udf1'.

פונקציה בהגדרת המשתמש

אפשר גם להרחיב את התבנית הזו על ידי כתיבת פונקציה בהגדרת המשתמש (UDF). התבנית קוראת ל-UDF עבור כל רכיב קלט. מטענים ייעודיים של רכיבים עוברים סריאליזציה כמחרוזות JSON. למידע נוסף, ראו יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: שדה הנתונים של הודעת Pub/Sub, שעבר סריאליזציה כמחרוזת JSON.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.
  • הפעלת התבנית

    המסוף

    1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
    2. כניסה לדף Create job from template
    3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
    4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

      רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

    5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Pub/Sub to BigQuery with Python UDF template.
    6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
    7. אופציונלי: כדי לעבור מעיבוד של כל נתון בדיוק פעם אחת אל מצב סטרימינג של כל נתון לפחות פעם אחת, בוחרים באפשרות לפחות פעם אחת.
    8. לוחצים על הפעלת העבודה.

    gcloud

    במעטפת או בטרמינל, מריצים את התבנית:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    מחליפים את מה שכתוב בשדות הבאים:

    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: הגרסה של התבנית שבה רוצים להשתמש

      אפשר להשתמש בערכים הבאים:

      • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
      • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
    • TOPIC_NAME: השם של נושא ה-Pub/Sub
    • DATASET: מערך הנתונים שלכם ב-BigQuery
    • TABLE_NAME: שם הטבלה ב-BigQuery

    API

    כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: הגרסה של התבנית שבה רוצים להשתמש

      אפשר להשתמש בערכים הבאים:

      • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
      • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
    • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
    • TOPIC_NAME: השם של נושא ה-Pub/Sub
    • DATASET: מערך הנתונים שלכם ב-BigQuery
    • TABLE_NAME: שם הטבלה ב-BigQuery

    המאמרים הבאים