תבנית Pub/Sub Avro ל-BigQuery

תבנית Pub/Sub Avro to BigQuery היא צינור להזרמת נתונים שקולט נתוני Avro ממנוי Pub/Sub לטבלה ב-BigQuery. כל השגיאות שמתרחשות במהלך הכתיבה לטבלה ב-BigQuery מועברות בסטרימינג לנושא לא מעובד ב-Pub/Sub.

לפני שמריצים צינור Dataflow לתרחיש הזה, כדאי לשקול אם מינוי ל-Pub/Sub BigQuery עם UDF עונה על הדרישות שלכם.

הדרישות לגבי צינורות עיבוד נתונים

  • המינוי לקלט Pub/Sub חייב להתקיים.
  • קובץ הסכימה של רשומות Avro צריך להיות קיים ב-Cloud Storage.
  • צריך לוודא שנושא ה-Pub/Sub שלא עבר עיבוד קיים.
  • מערך הנתונים ב-BigQuery שבו יאוחסנו התוצאות חייב להתקיים.

פרמטרים של תבניות

פרמטרים נדרשים

  • schemaPath: המיקום ב-Cloud Storage של קובץ סכימת Avro. לדוגמה, gs://path/to/my/schema.avsc.
  • inputSubscription: מינוי הקלט של Pub/Sub שממנו מתבצעת הקריאה. לדוגמה, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • outputTableSpec: מיקום טבלת הפלט ב-BigQuery שאליה ייכתב הפלט. לדוגמה, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.בהתאם ל-createDisposition שצוין, יכול להיות שטבלת הפלט תיצור באופן אוטומטי באמצעות סכימת Avro שסופקה על ידי המשתמש.
  • outputTopic: נושא Pub/Sub לשימוש ברשומות שלא עברו עיבוד. לדוגמה, projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

פרמטרים אופציונליים

  • useStorageWriteApiAtLeastOnce: כשמשתמשים ב-Storage Write API, המאפיין הזה מציין את סמנטיקת הכתיבה. כדי להשתמש בסמנטיקה של 'לפחות פעם אחת' (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), צריך להגדיר את הפרמטר הזה לערך true. כדי להשתמש בסמנטיקה של 'פעם אחת בדיוק', מגדירים את הפרמטר לערך false. הפרמטר הזה רלוונטי רק אם הערך של useStorageWriteApi הוא true. ערך ברירת המחדל הוא false.
  • writeDisposition: הערך של BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). לדוגמה, WRITE_APPEND,‏ WRITE_EMPTY או WRITE_TRUNCATE. ברירת המחדל היא WRITE_APPEND.
  • createDisposition: ‏BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). לדוגמה, CREATE_IF_NEEDED ו-CREATE_NEVER. ברירת המחדל היא CREATE_IF_NEEDED.
  • useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API‏ (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API‏ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0.
  • storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Pub/Sub Avro to BigQuery template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \
    --parameters \
schemaPath=SCHEMA_PATH,\
inputSubscription=SUBSCRIPTION_NAME,\
outputTableSpec=BIGQUERY_TABLE,\
outputTopic=DEADLETTER_TOPIC
  

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת Avro (לדוגמה, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub
  • BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery
  • DEADLETTER_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery",
      "parameters": {
          "schemaPath": "SCHEMA_PATH",
          "inputSubscription": "SUBSCRIPTION_NAME",
          "outputTableSpec": "BIGQUERY_TABLE",
          "outputTopic": "DEADLETTER_TOPIC"
      }
   }
}
  

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת Avro (לדוגמה, gs://MyBucket/file.avsc)
  • SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub
  • BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery
  • DEADLETTER_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד

המאמרים הבאים