תבנית Cloud Storage Text to BigQuery עם פונקציית UDF ב-Python

צינור הנתונים Cloud Storage Text to BigQuery with Python UDF הוא צינור נתונים של אצווה שקורא קובצי טקסט שמאוחסנים ב-Cloud Storage, משנה אותם באמצעות פונקציה מוגדרת על ידי המשתמש (UDF) ב-Python, ומצרף את התוצאה לטבלה ב-BigQuery.

הדרישות לגבי צינורות עיבוד נתונים

  • יוצרים קובץ JSON שמתאר את הסכימה של BigQuery.

    מוודאים שיש מערך JSON ברמה העליונה בשם BigQuery Schema ושהתוכן שלו תואם לתבנית {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    תבנית האצווה Cloud Storage Text to BigQuery לא תומכת בייבוא נתונים לשדות STRUCT (Record) בטבלת היעד ב-BigQuery.

    הנה דוגמה לסכימת BigQuery בפורמט JSON:

    {
      "BigQuery Schema": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        },
      ]
    }
  • יוצרים קובץ Python ‏ (.py) עם פונקציית ה-UDF שמספקת את הלוגיקה להמרת שורות הטקסט. הפונקציה צריכה להחזיר מחרוזת JSON.

    לדוגמה, הפונקציה הזו מפצלת כל שורה בקובץ CSV ומחזירה מחרוזת JSON אחרי הטרנספורמציה של הערכים.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

פרמטרים של תבניות

פרמטר תיאור
JSONPath הנתיב gs:// לקובץ ה-JSON שמגדיר את סכימת BigQuery, שמאוחסן ב-Cloud Storage. לדוגמה, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שרוצים להשתמש בה. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName השם של פונקציית Python בהגדרת המשתמש (UDF) שרוצים להשתמש בה.
inputFilePattern הנתיב gs:// לטקסט ב-Cloud Storage שרוצים לעבד. לדוגמה, gs://path/to/my/text/data.txt.
outputTable שם הטבלה ב-BigQuery שרוצים ליצור כדי לאחסן בה את הנתונים המעובדים. אם משתמשים מחדש בטבלת BigQuery קיימת, הנתונים מצורפים לטבלת היעד. לדוגמה, my-project-name:my-dataset.my-table.
bigQueryLoadingTemporaryDirectory הספרייה הזמנית לתהליך הטעינה של BigQuery. לדוגמה, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi אופציונלי: אם true, צינור הנתונים משתמש ב- BigQuery Storage Write API. ערך ברירת המחדל הוא false. מידע נוסף מופיע במאמר בנושא שימוש ב-Storage Write API.
useStorageWriteApiAtLeastOnce אופציונלי: כשמשתמשים ב-Storage Write API, מציינים את סמנטיקת הכתיבה. כדי להשתמש ב סמנטיקה של 'לפחות פעם אחת', צריך להגדיר את הפרמטר הזה לערך true. כדי להשתמש בסמנטיקה של שליחה בדיוק פעם אחת, צריך להגדיר את הפרמטר לערך false. הפרמטר הזה רלוונטי רק אם הערך של useStorageWriteApi הוא true. ערך ברירת המחדל הוא false.

פונקציה בהגדרת המשתמש

אפשר גם להרחיב את התבנית הזו על ידי כתיבת פונקציה בהגדרת המשתמש (UDF). התבנית קוראת ל-UDF עבור כל רכיב קלט. מטענים ייעודיים של רכיבים עוברים סריאליזציה כמחרוזות JSON. למידע נוסף, ראו יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: שורה של טקסט מקובץ קלט של Cloud Storage.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • PYTHON_FUNCTION: השם של פונקציה בהגדרת המשתמש (UDF) ב-Python שרוצים להשתמש בה.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_PYTHON_UDF_FILE: ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב שלכם ב-Cloud Storage לספריית temp

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "inputFilePattern":"PATH_TO_TEXT_DATA",
        "outputTable":"BIGQUERY_TABLE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",
   }
}

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • PYTHON_FUNCTION: השם של פונקציה בהגדרת המשתמש (UDF) ב-Python שרוצים להשתמש בה.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_PYTHON_UDF_FILE: ה-URI של Cloud Storage של קובץ קוד Python שמגדיר את הפונקציה בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, gs://my-bucket/my-udfs/my_file.py.
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב שלכם ב-Cloud Storage לספריית temp

המאמרים הבאים