תבנית Datastream to BigQuery (Stream)

תבנית Datastream to BigQuery היא צינור להזרמת נתונים שקורא נתונים מ-Datastream ומשכפל אותם ב-BigQuery. התבנית קוראת נתונים מ-Cloud Storage באמצעות התראות Pub/Sub ומשכפלת אותם לטבלת ביניים מחולקת לפי זמן ב-BigQuery. אחרי השכפול, התבנית מפעילה MERGE ב-BigQuery כדי לבצע פעולת upsert לכל השינויים ב-CDC (לכידת נתוני שינוי) בעותק של טבלת המקור. מציינים את הפרמטר gcsPubSubSubscription כדי לקרוא נתונים מהתראות Pub/Sub, או את הפרמטר inputFilePattern כדי לקרוא נתונים ישירות מקבצים ב-Cloud Storage.

התבנית מטפלת ביצירה ובעדכון של טבלאות BigQuery שמנוהלות על ידי השכפול. כשנדרשת שפת הגדרת נתונים (DDL), מתבצעת קריאה חוזרת ל-Datastream כדי לחלץ את סכימת טבלת המקור ולתרגם אותה לסוגי נתונים של BigQuery. הפעולות הנתמכות כוללות את הפעולות הבאות:

  • טבלאות חדשות נוצרות כשהנתונים מוכנסים.
  • עמודות חדשות מתווספות לטבלאות BigQuery עם ערכי null ראשוניים.
  • המערכת מתעלמת מעמודות שהוסרו ב-BigQuery, והערכים העתידיים הם null.
  • עמודות ששמן שונה מתווספות ל-BigQuery כעמודות חדשות.
  • שינויים בסוג לא מועברים ל-BigQuery.

מומלץ להריץ את צינור הנתונים הזה באמצעות מצב סטרימינג של לפחות פעם אחת, כי התבנית מבצעת ביטול כפילויות כשהיא ממזגת נתונים מטבלת BigQuery זמנית לטבלת BigQuery הראשית. השלב הזה בצינור העיבוד מצביע על כך שאין יתרון נוסף לשימוש במצב סטרימינג של בדיוק פעם אחת.

הדרישות לגבי צינורות עיבוד נתונים

  • מקור נתונים ב-Datastream שמוכן לשכפול נתונים או שכבר משכפל נתונים.
  • התראות Pub/Sub ל-Cloud Storage מופעלות עבור נתוני Datastream.
  • מערכי נתונים של יעד ב-BigQuery נוצרים, ולחשבון השירות של Compute Engine ניתנה גישת אדמין אליהם.
  • כדי ליצור את טבלת הרפליקה ביעד, צריך להגדיר מפתח ראשי בטבלת המקור.
  • מסד נתונים של MySQL או Oracle כמקור. אין תמיכה במסדי נתונים של PostgreSQL ו-SQL Server.

פרמטרים של תבניות

פרמטרים נדרשים

  • inputFileFormat: הפורמט של קובצי הפלט שנוצרו על ידי Datastream. הערכים המותרים הם avro ו-json. ברירת המחדל היא avro.
  • outputStagingDatasetTemplate: שם מערך הנתונים שמכיל טבלאות זמניות. הפרמטר הזה תומך בתבניות, לדוגמה {_metadata_dataset}_log או my_dataset_log. בדרך כלל, הפרמטר הזה הוא שם של קבוצת נתונים. ברירת המחדל היא {_metadata_dataset}. הערה: במקורות MySQL, שם מסד הנתונים ממופה ל-{_metadata_schema} במקום ל-{_metadata_dataset}.
  • outputDatasetTemplate: שם מערך הנתונים שמכיל את טבלאות הרפליקות. הפרמטר הזה תומך בתבניות, לדוגמה {_metadata_dataset} או my_dataset. בדרך כלל, הפרמטר הזה הוא שם של קבוצת נתונים. ברירת המחדל היא {_metadata_dataset}. הערה: במקורות MySQL, שם מסד הנתונים ממופה ל-{_metadata_schema} במקום ל-{_metadata_dataset}.
  • deadLetterQueueDirectory: הנתיב שבו Dataflow משתמש כדי לכתוב את הפלט של תור ההודעות המתות. הנתיב הזה לא יכול להיות זהה לנתיב של פלט הקובץ של Datastream. ברירת המחדל היא empty.

פרמטרים אופציונליים

  • inputFilePattern: מיקום הקובץ של פלט קובץ Datastream ב-Cloud Storage, בפורמט gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • gcsPubSubSubscription: מינוי Pub/Sub שמשמש את Cloud Storage כדי להודיע ל-Dataflow על קבצים חדשים שזמינים לעיבוד, בפורמט: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: השם או התבנית של הזרם שצריך לבצע בו סקר כדי לקבל מידע על הסכימה. ברירת המחדל היא: {_metadata_stream}. בדרך כלל ערך ברירת המחדל מספיק.
  • rfcStartDateTime: תאריך ושעת ההתחלה לשימוש באחזור נתונים מ-Cloud Storage ‏ (https://tools.ietf.org/html/rfc3339). ברירת המחדל: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: מספר הקבצים של DataStream שייקראו בו-זמנית. ברירת המחדל היא 10.
  • outputProjectId: המזהה של הפרויקט ב-Google Cloud שמכיל את מערכי הנתונים ב-BigQuery שאליהם יוצאו הנתונים. ברירת המחדל של הפרמטר הזה היא הפרויקט שבו צינור Dataflow פועל.
  • outputStagingTableNameTemplate: התבנית שמשמשת למתן שמות לטבלאות הזמניות. לדוגמה, {_metadata_table}. ברירת המחדל היא {_metadata_table}_log.
  • outputTableNameTemplate: התבנית שבה יש להשתמש לשם של טבלאות הרפליקה, לדוגמה {_metadata_table}. ברירת המחדל היא {_metadata_table}.
  • ignoreFields: שדות מופרדים בפסיקים שיושמטו ב-BigQuery. ברירת המחדל: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. לדוגמה, _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes: מספר הדקות בין מיזוגים של טבלה נתונה. ברירת המחדל היא 5.
  • dlqRetryMinutes: מספר הדקות בין ניסיונות חוזרים של DLQ. ברירת המחדל היא 10.
  • dataStreamRootUrl: כתובת הבסיס של Datastream API. ברירת המחדל היא: https://datastream.googleapis.com/.
  • applyMerge: האם להשבית שאילתות מיזוג עבור העבודה. ברירת המחדל היא true.
  • mergeConcurrency: מספר שאילתות ה-MERGE המקבילות ב-BigQuery. ההגדרה הזו תקפה רק אם applyMerge מוגדר כ-True. ברירת המחדל היא 30.
  • partitionRetentionDays: מספר הימים לשימוש בשמירת מחיצות כשמריצים מיזוגים ב-BigQuery. ברירת המחדל היא 1.
  • useStorageWriteApiAtLeastOnce: הפרמטר הזה תקף רק אם Use BigQuery Storage Write API מופעל. אם true, נעשה שימוש בסמנטיקה של 'לפחות פעם אחת' עבור Storage Write API. אחרת, נעשה שימוש בסמנטיקה של 'פעם אחת בדיוק'. ברירת המחדל היא false.
  • datastreamSourceType: עקיפה של זיהוי סוג המקור לנתוני CDC של Datastream. אם מציינים ערך, המערכת תשתמש בו במקום להסיק את סוג המקור מהשדה read_method. ערכים תקינים כוללים 'mysql',‏ 'postgresql',‏ 'oracle' וכו'. הפרמטר הזה שימושי כשהשדה read_method מכיל את הערך 'cdc' ולא ניתן לקבוע את סוג המקור בפועל באופן אוטומטי.
  • javascriptTextTransformGcsPath: ה-URI של Cloud Storage של קובץ ה-‎ .js שמגדיר את הפונקציה בהגדרת המשתמש (UDF) ב-JavaScript שבה רוצים להשתמש. לדוגמה, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: השם של פונקציית JavaScript בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, אם קוד הפונקציה ב-JavaScript הוא myTransform(inJson) { /*...do stuff...*/ }, אז שם הפונקציה הוא myTransform. דוגמאות לפונקציות מוגדרות על ידי המשתמש (UDF) ב-JavaScript זמינות במאמר UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: מציין את התדירות שבה יש לטעון מחדש את הפונקציה המוגדרת על ידי המשתמש, בדקות. אם הערך גדול מ-0, מערכת Dataflow בודקת מעת לעת את קובץ ה-UDF ב-Cloud Storage, ומטעינה מחדש את ה-UDF אם הקובץ משתנה. הפרמטר הזה מאפשר לכם לעדכן את ה-UDF בזמן שהצינור פועל, בלי שתצטרכו להפעיל מחדש את העבודה. אם הערך הוא 0, טעינה מחדש של UDF מושבתת. ערך ברירת המחדל הוא 0.
  • pythonTextTransformGcsPath: תבנית הנתיב ב-Cloud Storage לקוד Python שמכיל את הפונקציות המוגדרות על ידי המשתמש. לדוגמה, gs://your-bucket/your-transforms/*.py.
  • pythonRuntimeVersion: גרסת זמן הריצה לשימוש בפונקציית UDF של Python.
  • pythonTextTransformFunctionName: השם של הפונקציה לקריאה מקובץ ה-JavaScript. אפשר להשתמש רק באותיות, ספרות וקווים תחתונים. לדוגמה, transform_udf1.
  • runtimeRetries: מספר הפעמים שמערכת זמן הריצה תנסה לבצע מחדש את הפעולה לפני שהיא תיכשל. ברירת המחדל היא 5.
  • useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API‏ (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API‏ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0.
  • storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.

פונקציה בהגדרת המשתמש

אפשר גם להרחיב את התבנית הזו על ידי כתיבת פונקציה בהגדרת המשתמש (UDF). התבנית קוראת ל-UDF עבור כל רכיב קלט. מטענים ייעודיים של רכיבים עוברים סריאליזציה כמחרוזות JSON. למידע נוסף, ראו יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: נתוני ה-CDC, שעברו סריאליזציה כמחרוזת JSON.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.
  • הפעלת התבנית

    המסוף

    1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
    2. כניסה לדף Create job from template
    3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
    4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

      רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

    5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Datastream to BigQuery template.
    6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
    7. אופציונלי: כדי לעבור מעיבוד של כל נתון בדיוק פעם אחת אל מצב סטרימינג של כל נתון לפחות פעם אחת, בוחרים באפשרות לפחות פעם אחת.
    8. לוחצים על הפעלת העבודה.

    gcloud

    במעטפת או בטרמינל, מריצים את התבנית:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: הנתיב ב-Cloud Storage לנתוני Datastream. לדוגמה: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: המינוי ל-Pub/Sub שממנו קוראים את הקבצים שהשתנו. לדוגמה: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: השם של מערך הנתונים ב-BigQuery.
    • BIGQUERY_TABLE: תבנית הטבלה ב-BigQuery. לדוגמה, {_metadata_schema}_{_metadata_table}_log.

    API

    כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
    • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
    • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: הנתיב ב-Cloud Storage לנתוני Datastream. לדוגמה: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: המינוי ל-Pub/Sub שממנו קוראים את הקבצים שהשתנו. לדוגמה: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: השם של מערך הנתונים ב-BigQuery.
    • BIGQUERY_TABLE: תבנית הטבלה ב-BigQuery. לדוגמה, {_metadata_schema}_{_metadata_table}_log.

    המאמרים הבאים