תבנית של פונקציה מוגדרת על ידי המשתמש (UDF) ב-Python לטקסט ב-Cloud Storage ל-BigQuery (סטרימינג)

צינור הנתונים Cloud Storage Text to BigQuery הוא צינור נתונים להזרמה שמזרים קובצי טקסט שמאוחסנים ב-Cloud Storage, משנה אותם באמצעות פונקציה מוגדרת על ידי המשתמש (UDF) ב-Python שאתם מספקים, ומצרף את התוצאה ל-BigQuery.

הצינור פועל ללא הגבלת זמן, וצריך להפסיק אותו ידנית באמצעות ביטול ולא באמצעות ריקון, כי הוא משתמש בטרנספורמציה Watch, שהיא DoFn שניתן לפיצול ולא תומכת בריקון.

הדרישות לגבי צינורות עיבוד נתונים

  • יוצרים קובץ JSON שמתאר את הסכימה של טבלת הפלט ב-BigQuery.

    מוודאים שיש מערך JSON ברמה העליונה בשם fields ושהתוכן שלו תואם לתבנית {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. לדוגמה:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • יוצרים קובץ Python ‏ (.py) עם פונקציית ה-UDF שמספקת את הלוגיקה להמרת שורות הטקסט. הפונקציה צריכה להחזיר מחרוזת JSON.

    בדוגמה הבאה, כל שורה בקובץ CSV מפוצלת, נוצר אובייקט JSON עם הערכים ומוחזרת מחרוזת JSON:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

פרמטרים של תבניות

פרמטרים נדרשים

  • inputFilePattern: הנתיב gs:// לטקסט ב-Cloud Storage שרוצים לעבד. לדוגמה, gs://your-bucket/your-file.txt.
  • JSONPath: הנתיב gs:// לקובץ ה-JSON שמגדיר את הסכימה של BigQuery, שמאוחסן ב-Cloud Storage. לדוגמה, gs://your-bucket/your-schema.json.
  • outputTable: המיקום של טבלת BigQuery שבה יש לאחסן את הנתונים המעובדים. אם משתמשים מחדש בטבלה קיימת, היא נדרסת. לדוגמה, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • bigQueryLoadingTemporaryDirectory: ספרייה זמנית לתהליך הטעינה של BigQuery. לדוגמה, gs://your-bucket/your-files/temp-dir.

פרמטרים אופציונליים

  • outputDeadletterTable: טבלה להודעות שלא הצליחו להגיע לטבלת הפלט. אם טבלה לא קיימת, היא נוצרת במהלך הרצת צינור הנתונים. אם לא מציינים ערך, המערכת משתמשת בערך <outputTableSpec>_error_records. לדוגמה, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: הפרמטר הזה תקף רק אם Use BigQuery Storage Write API מופעל. אם האפשרות הזו מופעלת, נעשה שימוש בסמנטיקה של 'לפחות פעם אחת' עבור Storage Write API. אחרת, נעשה שימוש בסמנטיקה של 'בדיוק פעם אחת'. ברירת המחדל היא: false.
  • useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API‏ (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API‏ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0.
  • storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.
  • pythonExternalTextTransformGcsPath: תבנית הנתיב ב-Cloud Storage לקוד Python שמכיל את הפונקציות המוגדרות על ידי המשתמש. לדוגמה, gs://your-bucket/your-function.py.
  • pythonExternalTextTransformFunctionName: שם הפונקציה לקריאה מקובץ Python. אפשר להשתמש רק באותיות, ספרות וקווים תחתונים. לדוגמה, 'transform' or 'transform_udf1'.

פונקציה בהגדרת המשתמש

התבנית הזו מחייבת UDF שמנתח את קובצי הקלט, כפי שמתואר בקטע דרישות של צינורות. התבנית קוראת ל-UDF לכל שורת טקסט בכל קובץ קלט. מידע נוסף על יצירת פונקציות UDF זמין במאמר יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: שורה אחת של טקסט מקובץ קלט.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Cloud Storage Text to BigQuery (Stream) with Python UDF template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • PYTHON_FUNCTION: השם של פונקציה בהגדרת המשתמש (UDF) ב-Python שרוצים להשתמש בה.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_PYTHON_UDF_FILE: ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: השם של הטבלה ב-BigQuery שבה מאוחסנות הודעות שלא עברו עיבוד
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב ב-Cloud Storage לספריית הזמנית

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • PYTHON_FUNCTION: השם של פונקציה בהגדרת המשתמש (UDF) ב-Python שרוצים להשתמש בה.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_PYTHON_UDF_FILE: ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: השם של הטבלה ב-BigQuery שבה מאוחסנות הודעות שלא עברו עיבוד
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב ב-Cloud Storage לספריית הזמנית

המאמרים הבאים