תבנית של פונקציה מוגדרת על ידי המשתמש (UDF) ב-Python להעברה מ-Pub/Sub Proto ל-BigQuery

תבנית ה-proto מ-Pub/Sub ל-BigQuery היא צינור להעברת נתונים בזמן אמת, שקולט נתוני proto ממנוי Pub/Sub לטבלה ב-BigQuery. כל השגיאות שמתרחשות במהלך הכתיבה לטבלה ב-BigQuery מועברות בסטרימינג לנושא לא מעובד ב-Pub/Sub.

אפשר לספק פונקציה בהגדרת המשתמש (UDF) ב-Python כדי לבצע טרנספורמציה של נתונים. שגיאות במהלך ההפעלה של ה-UDF יכולות להישלח לנושא נפרד ב-Pub/Sub או לאותו נושא לא מעובד כמו השגיאות ב-BigQuery.

לפני שמריצים צינור Dataflow לתרחיש הזה, כדאי לשקול אם מינוי ל-Pub/Sub BigQuery עם UDF עונה על הדרישות שלכם.

הדרישות לגבי צינורות עיבוד נתונים

  • המינוי לקלט Pub/Sub חייב להתקיים.
  • קובץ הסכימה של רשומות ה-Proto צריך להיות קיים ב-Cloud Storage.
  • נושא הפלט ב-Pub/Sub חייב להתקיים.
  • מערך הנתונים ב-BigQuery שבו יאוחסנו התוצאות חייב להתקיים.
  • אם הטבלה ב-BigQuery קיימת, היא צריכה לכלול סכימה שתואמת לנתוני הפרוטו, ללא קשר לערך של createDisposition.

פרמטרים של תבניות

פרמטר תיאור
protoSchemaPath המיקום ב-Cloud Storage של קובץ סכמת ה-proto העצמאי. לדוגמה, gs://path/to/my/file.pb. אפשר ליצור את הקובץ הזה באמצעות הדגל --descriptor_set_out של הפקודה protoc. הדגל --include_imports מבטיח שהקובץ יהיה עצמאי.
fullMessageName השם המלא של הודעת ה-proto. לדוגמה, package.name.MessageName, כאשר package.name הוא הערך שצוין עבור ההצהרה package ולא עבור ההצהרה java_package.
inputSubscription מינוי הקלט של Pub/Sub שממנו רוצים לקרוא. לדוגמה, projects/<project>/subscriptions/<subscription>.
outputTopic נושא Pub/Sub לשימוש ברשומות שלא עברו עיבוד. לדוגמה, projects/<project-id>/topics/<topic-name>.
outputTableSpec המיקום של טבלת הפלט ב-BigQuery. לדוגמה, my-project:my_dataset.my_table. בהתאם ל-createDisposition שצוין, יכול להיות שטבלת הפלט תיצור באופן אוטומטי באמצעות קובץ סכימת הקלט.
preserveProtoFieldNames אופציונלי: true כדי לשמור את השם המקורי של שדה Proto ב-JSON. ‫false כדי להשתמש בשמות JSON סטנדרטיים יותר. לדוגמה, הפקודה false תשנה את field_name ל-fieldName. (ברירת מחדל: false)
bigQueryTableSchemaPath אופציונלי: נתיב Cloud Storage לנתיב סכימת BigQuery. לדוגמה, gs://path/to/my/schema.json. אם לא מציינים את זה, הסכימה מוסקת מסכימת ה-Proto.
pythonExternalTextTransformGcsPath אופציונלי: ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שרוצים להשתמש בה. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName אופציונלי: השם של פונקציה בהגדרת המשתמש (UDF) ב-Python שרוצים להשתמש בה.
udfOutputTopic אופציונלי: נושא Pub/Sub שבו מאוחסנות שגיאות של פונקציות מוגדרות על ידי המשתמש. לדוגמה, projects/<project-id>/topics/<topic-name>. אם לא מציינים את הנושא הזה, שגיאות ב-UDF נשלחות לאותו נושא כמו outputTopic.
writeDisposition אופציונלי: מזהה BigQuery WriteDisposition. לדוגמה, WRITE_APPEND, ‏ WRITE_EMPTY או WRITE_TRUNCATE. ברירת מחדל: WRITE_APPEND.
createDisposition אופציונלי: מזהה BigQuery CreateDisposition. לדוגמה, CREATE_IF_NEEDED, CREATE_NEVER. ברירת מחדל: CREATE_IF_NEEDED.
useStorageWriteApi אופציונלי: אם true, צינור הנתונים משתמש ב- BigQuery Storage Write API. ערך ברירת המחדל הוא false. מידע נוסף מופיע במאמר בנושא שימוש ב-Storage Write API.
useStorageWriteApiAtLeastOnce אופציונלי: כשמשתמשים ב-Storage Write API, מציינים את סמנטיקת הכתיבה. כדי להשתמש ב סמנטיקה של 'לפחות פעם אחת', צריך להגדיר את הפרמטר הזה לערך true. כדי להשתמש בסמנטיקה של שליחה בדיוק פעם אחת, צריך להגדיר את הפרמטר לערך false. הפרמטר הזה רלוונטי רק אם הערך של useStorageWriteApi הוא true. ערך ברירת המחדל הוא false.
numStorageWriteApiStreams אופציונלי: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.
storageWriteApiTriggeringFrequencySec אופציונלי: כשמשתמשים ב-Storage Write API, מציינים את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.

פונקציה בהגדרת המשתמש

אפשר גם להרחיב את התבנית הזו על ידי כתיבת פונקציה בהגדרת המשתמש (UDF). התבנית קוראת ל-UDF עבור כל רכיב קלט. מטענים ייעודיים של רכיבים עוברים סריאליזציה כמחרוזות JSON. למידע נוסף, ראו יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: שדה הנתונים של הודעת Pub/Sub, שעבר סריאליזציה כמחרוזת JSON.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.
  • הפעלת התבנית

    המסוף

    1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
    2. כניסה לדף Create job from template
    3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
    4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

      רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

    5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Pub/Sub Proto to BigQuery with Python UDF template.
    6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
    7. לוחצים על הפעלת העבודה.

    gcloud

    במעטפת או בטרמינל, מריצים את התבנית:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    מחליפים את מה שכתוב בשדות הבאים:

    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: הגרסה של התבנית שבה רוצים להשתמש

      אפשר להשתמש בערכים הבאים:

      • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
      • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת ה-Proto (לדוגמה, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: שם הודעת ה-Proto (לדוגמה, package.name.MessageName)
    • SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub
    • BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery
    • UNPROCESSED_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד

    API

    כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Xlang",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: הגרסה של התבנית שבה רוצים להשתמש

      אפשר להשתמש בערכים הבאים:

      • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
      • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
    • SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת ה-Proto (לדוגמה, gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME: שם הודעת ה-Proto (לדוגמה, package.name.MessageName)
    • SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub
    • BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery
    • UNPROCESSED_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד

    המאמרים הבאים