תבנית Cloud Storage Text to BigQuery (Stream)

צינור הנתונים Cloud Storage Text to BigQuery הוא צינור נתונים להזרמת נתונים, שמזרים קובצי טקסט שמאוחסנים ב-Cloud Storage, מבצע בהם טרנספורמציה באמצעות פונקציה מוגדרת על ידי המשתמש (UDF) ב-JavaScript שאתם מספקים, ומצרף את התוצאה ל-BigQuery.

הצינור פועל ללא הגבלת זמן, וצריך להפסיק אותו ידנית באמצעות ביטול ולא באמצעות ריקון, כי הוא משתמש בטרנספורמציה Watch, שהיא DoFn שניתן לפיצול ולא תומכת בריקון.

הדרישות לגבי צינורות עיבוד נתונים

  • יוצרים קובץ JSON שמתאר את הסכימה של טבלת הפלט ב-BigQuery.

    מוודאים שיש מערך JSON ברמה העליונה בשם fields ושהתוכן שלו תואם לתבנית {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. לדוגמה:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • יוצרים קובץ JavaScript ‏ (.js) עם פונקציית ה-UDF שמספקת את הלוגיקה להמרת שורות הטקסט. הפונקציה צריכה להחזיר מחרוזת JSON.

    בדוגמה הבאה, כל שורה בקובץ CSV מפוצלת, נוצר אובייקט JSON עם הערכים ומוחזרת מחרוזת JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

פרמטרים של תבניות

פרמטרים נדרשים

  • inputFilePattern: הנתיב gs:// לטקסט ב-Cloud Storage שרוצים לעבד. לדוגמה, gs://your-bucket/your-file.txt.
  • JSONPath: הנתיב gs:// לקובץ ה-JSON שמגדיר את הסכימה של BigQuery, שמאוחסן ב-Cloud Storage. לדוגמה, gs://your-bucket/your-schema.json.
  • outputTable: המיקום של טבלת BigQuery שבה יש לאחסן את הנתונים המעובדים. אם משתמשים מחדש בטבלה קיימת, היא נדרסת. לדוגמה, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • javascriptTextTransformGcsPath: ה-URI של Cloud Storage של קובץ .js שמגדיר את הפונקציה בהגדרת המשתמש (UDF) ב-JavaScript שרוצים להשתמש בה. לדוגמה, gs://your-bucket/your-transforms/*.js.
  • javascriptTextTransformFunctionName: השם של פונקציית JavaScript בהגדרת המשתמש (UDF) שבה רוצים להשתמש. לדוגמה, אם קוד הפונקציה ב-JavaScript הוא myTransform(inJson) { /*...do stuff...*/ }, אז שם הפונקציה הוא myTransform. דוגמאות לפונקציות מוגדרות על ידי המשתמש ב-JavaScript זמינות במאמר UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). לדוגמה, transform_udf1.
  • bigQueryLoadingTemporaryDirectory: ספרייה זמנית לתהליך הטעינה של BigQuery. לדוגמה, gs://your-bucket/your-files/temp-dir.

פרמטרים אופציונליים

  • outputDeadletterTable: טבלה להודעות שלא הצליחו להגיע לטבלת הפלט. אם טבלה לא קיימת, היא נוצרת במהלך הרצת צינור הנתונים. אם לא מציינים ערך, המערכת משתמשת בערך <outputTableSpec>_error_records. לדוגמה, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: הפרמטר הזה תקף רק אם Use BigQuery Storage Write API מופעל. אם האפשרות הזו מופעלת, נעשה שימוש בסמנטיקה של 'לפחות פעם אחת' עבור Storage Write API. אחרת, נעשה שימוש בסמנטיקה של 'בדיוק פעם אחת'. ברירת המחדל היא: false.
  • useStorageWriteApi: אם הערך הוא true, צינור הנתונים משתמש ב-BigQuery Storage Write API‏ (https://cloud.google.com/bigquery/docs/write-api). ערך ברירת המחדל הוא false. מידע נוסף זמין במאמר בנושא שימוש ב-Storage Write API‏ (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: כשמשתמשים ב-Storage Write API, מציינים את מספר זרמי הכתיבה. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה. ברירת המחדל היא 0.
  • storageWriteApiTriggeringFrequencySec: כשמשתמשים ב-Storage Write API, הפרמטר הזה מציין את תדירות ההפעלה בשניות. אם useStorageWriteApi הוא true ו-useStorageWriteApiAtLeastOnce הוא false, חובה להגדיר את הפרמטר הזה.
  • pythonExternalTextTransformGcsPath: תבנית הנתיב ב-Cloud Storage לקוד Python שמכיל את הפונקציות המוגדרות על ידי המשתמש. לדוגמה, gs://your-bucket/your-function.py.
  • javascriptTextTransformReloadIntervalMinutes: מציין את התדירות שבה יש לטעון מחדש את הפונקציה המוגדרת על ידי המשתמש, בדקות. אם הערך גדול מ-0, מערכת Dataflow בודקת מעת לעת את קובץ ה-UDF ב-Cloud Storage, ומטעינה מחדש את ה-UDF אם הקובץ משתנה. הפרמטר הזה מאפשר לכם לעדכן את ה-UDF בזמן שהצינור פועל, בלי שתצטרכו להפעיל מחדש את העבודה. אם הערך הוא 0, טעינה מחדש של UDF מושבתת. ערך ברירת המחדל הוא 0.

פונקציה בהגדרת המשתמש

התבנית הזו מחייבת UDF שמנתח את קובצי הקלט, כפי שמתואר בקטע דרישות של צינורות. התבנית קוראת ל-UDF לכל שורת טקסט בכל קובץ קלט. מידע נוסף על יצירת פונקציות UDF זמין במאמר יצירת פונקציות מוגדרות על ידי המשתמש לתבניות Dataflow.

מפרט הפונקציה

המאפיינים של פונקציית UDF:

  • קלט: שורה אחת של טקסט מקובץ קלט.
  • פלט: מחרוזת JSON שתואמת לסכימה של טבלת היעד ב-BigQuery.

הפעלת התבנית

המסוף

  1. עוברים לדף Dataflow Create job from template (יצירת משימה מתבנית).
  2. כניסה לדף Create job from template
  3. בשדה שם המשימה, מזינים שם ייחודי למשימה.
  4. אופציונלי: בשדה Regional endpoint (נקודת קצה אזורית), בוחרים ערך מהתפריט הנפתח. אזור ברירת המחדל הוא us-central1.

    רשימת האזורים שבהם אפשר להריץ משימת Dataflow מופיעה במאמר מיקומי Dataflow.

  5. בתפריט הנפתח Dataflow template (תבנית של העברת נתונים), בוחרים באפשרות the Cloud Storage Text to BigQuery (Stream) template.
  6. בשדות הפרמטרים שמופיעים, מזינים את ערכי הפרמטרים.
  7. לוחצים על הפעלת העבודה.

gcloud

במעטפת או בטרמינל, מריצים את התבנית:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

מחליפים את מה שכתוב בשדות הבאים:

  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • REGION_NAME: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: השם של פונקציית JavaScript בהגדרת המשתמש (UDF) שרוצים להשתמש בה

    לדוגמה, אם קוד הפונקציה ב-JavaScript הוא myTransform(inJson) { /*...do stuff...*/ }, אז שם הפונקציה הוא myTransform. דוגמאות ל-UDF ב-JavaScript זמינות במאמר דוגמאות ל-UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_JAVASCRIPT_UDF_FILE: ה-URI של Cloud Storage של קובץ .js שמגדיר את הפונקציה בהגדרת המשתמש (UDF) ב-JavaScript שרוצים להשתמש בה – לדוגמה, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: השם של הטבלה ב-BigQuery שבה מאוחסנות הודעות שלא עברו עיבוד
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב ב-Cloud Storage לספריית הזמנית

API

כדי להריץ את התבנית באמצעות API בארכיטקטורת REST, שולחים בקשת HTTP POST. מידע נוסף על ה-API ועל היקפי ההרשאות שלו זמין במאמר projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

מחליפים את מה שכתוב בשדות הבאים:

  • PROJECT_ID: מזהה הפרויקט שבו רוצים להריץ את משימת Dataflow Google Cloud
  • JOB_NAME: שם ייחודי של המשימה לפי בחירתכם
  • LOCATION: האזור שבו רוצים לפרוס את עבודת Dataflow, לדוגמה: us-central1
  • VERSION: הגרסה של התבנית שבה רוצים להשתמש

    אפשר להשתמש בערכים הבאים:

    • latest כדי להשתמש בגרסה העדכנית של התבנית, שזמינה בתיקיית ההורה ללא תאריך בדלי – gs://dataflow-templates-REGION_NAME/latest/
    • שם הגרסה, כמו 2023-09-12-00_RC00, כדי להשתמש בגרסה ספציפית של התבנית, שאפשר למצוא אותה בתיקיית האב המתאימה עם התאריך בדלי – gs://dataflow-templates-REGION_NAME/
  • STAGING_LOCATION: המיקום של קבצים מקומיים להעברה (לדוגמה, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: השם של פונקציית JavaScript בהגדרת המשתמש (UDF) שרוצים להשתמש בה

    לדוגמה, אם קוד הפונקציה ב-JavaScript הוא myTransform(inJson) { /*...do stuff...*/ }, אז שם הפונקציה הוא myTransform. דוגמאות ל-UDF ב-JavaScript זמינות במאמר דוגמאות ל-UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: הנתיב ב-Cloud Storage אל קובץ ה-JSON שמכיל את הגדרת הסכימה
  • PATH_TO_JAVASCRIPT_UDF_FILE: ה-URI של Cloud Storage של קובץ .js שמגדיר את הפונקציה בהגדרת המשתמש (UDF) ב-JavaScript שרוצים להשתמש בה – לדוגמה, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: הנתיב ב-Cloud Storage למערך הנתונים של הטקסט
  • BIGQUERY_TABLE: שם הטבלה ב-BigQuery
  • BIGQUERY_UNPROCESSED_TABLE: השם של הטבלה ב-BigQuery שבה מאוחסנות הודעות שלא עברו עיבוד
  • PATH_TO_TEMP_DIR_ON_GCS: הנתיב ב-Cloud Storage לספריית הזמנית

המאמרים הבאים