תבנית Pub/Sub Avro to BigQuery היא צינור להזרמת נתונים שקולט נתוני Avro ממנוי Pub/Sub לטבלה ב-BigQuery. כל השגיאות שמתרחשות במהלך הכתיבה לטבלה ב-BigQuery מועברות בסטרימינג לנושא לא מעובד ב-Pub/Sub.
לפני שמריצים צינור Dataflow לתרחיש הזה, כדאי לשקול אם מינוי ל-Pub/Sub BigQuery עם UDF עונה על הדרישות שלכם.
הדרישות לגבי צינורות עיבוד נתונים
- המינוי לקלט Pub/Sub חייב להתקיים.
- קובץ הסכימה של רשומות Avro צריך להיות קיים ב-Cloud Storage.
- צריך לוודא שנושא ה-Pub/Sub שלא עבר עיבוד קיים.
- מערך הנתונים ב-BigQuery שבו יאוחסנו התוצאות חייב להתקיים.
פרמטרים של תבניות
פרמטרים נדרשים
- schemaPath: המיקום ב-Cloud Storage של קובץ סכימת Avro. לדוגמה,
gs://path/to/my/schema.avsc. - inputSubscription: מינוי הקלט של Pub/Sub שממנו מתבצעת הקריאה. לדוגמה,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>. - outputTableSpec: מיקום טבלת הפלט ב-BigQuery שאליה ייכתב הפלט. לדוגמה,
<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.בהתאם ל-createDispositionשצוין, יכול להיות שטבלת הפלט תיצור באופן אוטומטי באמצעות סכימת Avro שסופקה על ידי המשתמש. - outputTopic: נושא Pub/Sub לשימוש ברשומות שלא עברו עיבוד. לדוגמה,
projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
פרמטרים אופציונליים
- useStorageWriteApiAtLeastOnce: כשמשתמשים ב-Storage Write API, המאפיין הזה מציין את סמנטיקת הכתיבה. כדי להשתמש בסמנטיקה של 'לפחות פעם אחת' (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), צריך להגדיר את הפרמטר הזה לערך true. כדי להשתמש בסמנטיקה של 'פעם אחת בדיוק', מגדירים את הפרמטר לערך
false. הפרמטר הזה רלוונטי רק אם הערך שלuseStorageWriteApiהואtrue. ערך ברירת המחדל הואfalse. - writeDisposition: הערך של BigQuery WriteDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). לדוגמה,
WRITE_APPEND,WRITE_EMPTYאוWRITE_TRUNCATE. ברירת המחדל היאWRITE_APPEND. - createDisposition: BigQuery CreateDisposition (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload). לדוגמה,
CREATE_IF_NEEDEDו-CREATE_NEVER. ברירת המחדל היאCREATE_IF_NEEDED. - useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא
false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api). - numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם
useStorageWriteApiהואtrueו-useStorageWriteApiAtLeastOnceהואfalse, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0. - storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם
useStorageWriteApiהואtrueו-useStorageWriteApiAtLeastOnceהואfalse, חובה להגדיר את הפרמטר הזה.
הפעלת התבנית
המסוף
- עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית). כניסה לדף Create job from template
- בשדה שם המשימה, מזינים שם ייחודי למשימה.
- אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא
us-central1.רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.
- בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Pub/Sub Avro to BigQuery template.
- בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
- לוחצים על הפעלת העבודה.
gcloud
במעטפת או בטרמינל, מריצים את התבנית:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Avro_to_BigQuery \ --parameters \ schemaPath=SCHEMA_PATH,\ inputSubscription=SUBSCRIPTION_NAME,\ outputTableSpec=BIGQUERY_TABLE,\ outputTopic=DEADLETTER_TOPIC
מחליפים את מה שכתוב בשדות הבאים:
-
JOB_NAME: שם ייחודי של המשימה לפי בחירתכם -
REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה:us-central1 -
VERSION: הגרסה של התבנית שבה רוצים להשתמשאפשר להשתמש בערכים הבאים:
-
latestכדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/ - שם הגרסה, כמו
2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
-
-
SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת Avro (לדוגמה,gs://MyBucket/file.avsc) -
SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub -
BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery -
DEADLETTER_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד
API
כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_Avro_to_BigQuery", "parameters": { "schemaPath": "SCHEMA_PATH", "inputSubscription": "SUBSCRIPTION_NAME", "outputTableSpec": "BIGQUERY_TABLE", "outputTopic": "DEADLETTER_TOPIC" } } }
מחליפים את מה שכתוב בשדות הבאים:
-
JOB_NAME: שם ייחודי של המשימה לפי בחירתכם -
LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה:us-central1 -
VERSION: הגרסה של התבנית שבה רוצים להשתמשאפשר להשתמש בערכים הבאים:
-
latestכדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/ - שם הגרסה, כמו
2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
-
-
SCHEMA_PATH: הנתיב ב-Cloud Storage לקובץ סכימת Avro (לדוגמה,gs://MyBucket/file.avsc) -
SUBSCRIPTION_NAME: שם המינוי לקלט Pub/Sub -
BIGQUERY_TABLE: שם טבלת הפלט ב-BigQuery -
DEADLETTER_TOPIC: נושא ה-Pub/Sub שבו רוצים להשתמש לתור של פריטים שלא עברו עיבוד