תבנית של מחולל נתונים להזנת זרם נתונים ל-Pub/Sub, ל-BigQuery ול-Cloud Storage

תבנית מחולל נתוני סטרימינג משמשת ליצירת מספר בלתי מוגבל או קבוע של רשומות או הודעות סינתטיות על סמך סכימה שסופקה על ידי המשתמש בקצב שצוין. היעדים התואמים כוללים נושאים ב-Pub/Sub, טבלאות ב-BigQuery וקטגוריות אחסון ב-Cloud Storage.

הנה כמה תרחישי שימוש אפשריים:

  • מדמים פרסום של אירועים בזמן אמת בהיקף גדול בנושא Pub/Sub כדי למדוד ולקבוע את המספר והגודל של הצרכנים שנדרשים לעיבוד האירועים שפורסמו.
  • יצירת נתונים סינתטיים בטבלה ב-BigQuery או בקטגוריה של Cloud Storage כדי להעריך את מדדי הביצועים או כדי להוכיח את היתכנות הרעיון.

פורמטים נתמכים של קידוד ויעדים

בטבלה הבאה מפורטים מאגרי הנתונים ופורמטי הקידוד שנתמכים בתבנית הזו:
JSON Avro Parquet
Pub/Sub כן כן לא
BigQuery כן לא לא
Cloud Storage כן כן כן

הדרישות לגבי צינורות עיבוד נתונים

  • לחשבון השירות של העובד צריך להיות מוקצה התפקיד Dataflow Worker ‏ (roles/dataflow.worker). מידע נוסף זמין במאמר מבוא ל-IAM.
  • יוצרים קובץ סכימה שמכיל תבנית JSON לנתונים שנוצרו. התבנית הזו משתמשת בספרייה JSON Data Generator, כך שאפשר לספק פונקציות שונות של faker לכל שדה בסכימה. מידע נוסף מופיע במסמכי התיעוד בנושא json-data-generator.

    לדוגמה:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • מעלים את קובץ הסכימה לקטגוריה של Cloud Storage.
  • יעד הפלט חייב להתקיים לפני ההרצה. יעד הניקוז חייב להיות נושא Pub/Sub, טבלה ב-BigQuery או קטגוריה של Cloud Storage, בהתאם לסוג הניקוז.
  • אם קידוד הפלט הוא Avro או Parquet, צריך ליצור קובץ סכימת Avro ולאחסן אותו במיקום ב-Cloud Storage.
  • מקצים לחשבון השירות של העובד תפקיד נוסף ב-IAM בהתאם ליעד הרצוי.
    יעד תפקיד IAM נוסף שנדרש על איזה משאב להחיל את ההרשאה
    Pub/Sub פרסום הודעות ב-Pub/Sub‏ (roles/pubsub.publisher)
    (מידע נוסף זמין במאמר בקרת גישה ב-Pub/Sub באמצעות IAM)
    נושא Pub/Sub
    BigQuery עריכה של נתוני BigQuery‏ (roles/bigquery.dataEditor)
    (מידע נוסף זמין במאמר בנושא בקרת גישה ל-BigQuery באמצעות IAM)
    מערך נתונים של BigQuery
    Cloud Storage אדמין של אובייקט אחסון ב-Cloud Storage‏ (roles/storage.objectAdmin)
    (מידע נוסף זמין במאמר בקרת גישה ל-Cloud Storage באמצעות IAM)
    קטגוריה של Cloud Storage

פרמטרים של תבניות

פרמטר תיאור
schemaLocation המיקום של קובץ הסכימה. לדוגמה: gs://mybucket/filename.json.
qps מספר ההודעות לפרסום בכל שנייה. לדוגמה: 100.
sinkType (אופציונלי) סוג יעד הפלט. הערכים האפשריים הם PUBSUB,‏ BIGQUERY ו-GCS. ברירת המחדל היא PUBSUB.
outputType (אופציונלי) סוג קידוד הפלט. הערכים האפשריים הם JSON,‏ AVRO ו-PARQUET. ברירת המחדל היא JSON.
avroSchemaLocation (אופציונלי) המיקום של קובץ סכימת AVRO. חובה אם הערך של outputType הוא AVRO או PARQUET. לדוגמה: gs://mybucket/filename.avsc.
topic (אופציונלי) שם נושא ה-Pub/Sub שאליו צינור הנתונים צריך לפרסם נתונים. חובה אם הערך של sinkType הוא Pub/Sub. לדוגמה: projects/my-project-id/topics/my-topic-id.
outputTableSpec (אופציונלי) שם טבלת הפלט ב-BigQuery. חובה כש-sinkType הוא BigQuery. לדוגמה: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (אופציונלי) BigQuery Write Disposition. הערכים האפשריים הם WRITE_APPEND, ‏ WRITE_EMPTY או WRITE_TRUNCATE. ברירת המחדל היא WRITE_APPEND.
outputDeadletterTable (אופציונלי) שם טבלת הפלט ב-BigQuery שאליה יועברו הרשומות שנכשלו. אם לא מציינים שם, צינור הנתונים יוצר טבלה במהלך ההרצה עם השם {output_table_name}_error_records. לדוגמה: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (אופציונלי) הנתיב של מיקום הפלט ב-Cloud Storage. חובה אם sinkType הוא Cloud Storage. לדוגמה: gs://mybucket/pathprefix/.
outputFilenamePrefix (אופציונלי) הקידומת של שם הקובץ של קובצי הפלט שנכתבים ב-Cloud Storage. ברירת המחדל היא output-.
windowDuration (אופציונלי) מרווח החלון שבו הפלט נכתב ב-Cloud Storage. ברירת המחדל היא 1m (כלומר, דקה אחת).
numShards (אופציונלי) מספר הרסיסים המקסימלי של הפלט. חובה להשתמש בפרמטר הזה כש-sinkType הוא Cloud Storage, והערך שלו צריך להיות 1 או מספר גבוה יותר.
messagesLimit (אופציונלי) מספר ההודעות המקסימלי בפלט. ברירת המחדל היא 0, שמציין שאין הגבלה.
autoscalingAlgorithm (אופציונלי) האלגוריתם שמשמש להתאמה אוטומטית של מספר העובדים. הערכים האפשריים הם THROUGHPUT_BASED להפעלה של התאמה אוטומטית לעומס או NONE להשבתה.
maxNumWorkers (אופציונלי) מספר מקסימלי של מכונות עובד. לדוגמה: 10.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Streaming Data Generator template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: הנתיב לקובץ הסכימה ב-Cloud Storage. לדוגמה: gs://mybucket/filename.json.
  • QPS: מספר ההודעות לפרסום בשנייה
  • PUBSUB_TOPIC: נושא ה-Pub/Sub של הפלט. לדוגמה: projects/my-project-id/topics/my-topic-id.

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • SCHEMA_LOCATION: הנתיב לקובץ הסכימה ב-Cloud Storage. לדוגמה: gs://mybucket/filename.json.
  • QPS: מספר ההודעות לפרסום בשנייה
  • PUBSUB_TOPIC: נושא ה-Pub/Sub של הפלט. לדוגמה: projects/my-project-id/topics/my-topic-id.

המאמרים הבאים