טעינת נתונים מ-Cloud Storage ל-BigQuery באמצעות Workflows

Last reviewed 2021-05-12 UTC
במדריך הזה מוסבר איך להריץ באופן מהימן תהליכי עבודה ללא שרת באמצעות Workflows,‏ Cloud Run functions ו-Firestore כדי לטעון נתונים גולמיים, כמו יומני אירועים, מ-Cloud Storage ל-BigQuery. בדרך כלל, פלטפורמות ניתוח כוללות כלי לניהול תהליכים שמאפשר לטעון נתונים ב-BigQuery באופן תקופתי באמצעות משימות של BigQuery, ואז לשנות את הנתונים כדי לספק מדדים עסקיים באמצעות הצהרות SQL, כולל הצהרות של שפת תכנות פרוצדורלית של BigQuery. המדריך הזה מיועד למפתחים ולאדריכלים שרוצים ליצור צינורות לעיבוד נתונים מבוסס-אירועים ללא שרתים. המדריך מתבסס על ההנחה שאתם מכירים את YAML,‏ SQL ו-Python.

ארכיטקטורה

התרשים הבא מציג את הארכיטקטורה הכללית של צינור extract, load, and transform (ELT) בלי שרת (serverless) באמצעות Workflows.

צינור עיבוד נתונים לחילוץ, טעינה וטרנספורמציה.

בתרשים שלמעלה, נניח שיש פלטפורמת קמעונאות שאוספת מעת לעת אירועי מכירות כקבצים מחנויות שונות, ואז כותבת את הקבצים לקטגוריה של Cloud Storage. האירועים משמשים ליצירת מדדים עסקיים באמצעות ייבוא ועיבוד ב-BigQuery. הארכיטקטורה הזו מספקת מערכת תזמור אמינה וללא שרתים (serverless) לייבוא קבצים ל-BigQuery, והיא מחולקת לשני המודולים הבאים:

  • רשימת קבצים: רשימת הקבצים שלא עברו עיבוד שנוספו לקטגוריה של Cloud Storage בקולקציה של Firestore. המודול הזה פועל באמצעות פונקציית Cloud Run שמופעלת על ידי אירוע אחסון מסוג Object Finalize, שנוצר כשמוסיפים קובץ חדש לקטגוריה של Cloud Storage. שם הקובץ מצורף למערך files של האוסף שנקרא new ב-Firestore.
  • Workflow: מפעיל את תהליכי העבודה המתוזמנים. ‫Cloud Scheduler מפעיל תהליך עבודה שמבצע סדרה של שלבים בהתאם לתחביר מבוסס YAML כדי לתזמן את הטעינה, ואז להמיר את הנתונים ב-BigQuery באמצעות קריאה לפונקציות של Cloud Run. השלבים בתהליך העבודה מפעילים פונקציות Cloud Run כדי לבצע את המשימות הבאות:

    • יוצרים משימת טעינה ב-BigQuery ומפעילים אותה.
    • שליחת שאילתה לגבי הסטטוס של משימת הטעינה.
    • יוצרים את עבודת השאילתה של הטרנספורמציה ומפעילים אותה.
    • שליחת שאילתה לגבי הסטטוס של משימת השינוי.

שימוש בטרנזקציות כדי לשמור על רשימת הקבצים החדשים ב-Firestore עוזר להבטיח שלא יפספסו קובץ כשמייבאים אותו ל-BigQuery בתהליך עבודה. כדי להפוך את ההרצות הנפרדות של תהליך העבודה לאידמפוטנטיות, מאחסנים את המטא-נתונים והסטטוס של המשימה ב-Firestore.

מטרות

  • יוצרים מסד נתונים ב-Firestore.
  • מגדירים טריגר של פונקציית Cloud Run כדי לעקוב אחרי קבצים שנוספו לקטגוריה של Cloud Storage ב-Firestore.
  • פריסת פונקציות Cloud Run להרצה ולמעקב אחרי משימות BigQuery.
  • פורסים ומריצים תהליך עבודה כדי להפוך את התהליך לאוטומטי.

עלויות

במסמך הזה משתמשים ברכיבים הבאים של Google Cloud, והשימוש בהם כרוך בתשלום:

כדי להעריך את ההוצאות בהתאם לתחזית השימוש שלכם, אתם יכולים להיעזר במחשבון העלויות.

משתמשים חדשים של Google Cloud ? יכול להיות שאתם זכאים לתקופת ניסיון בחינם.

כשמסיימים את המשימות שמתוארות במסמך הזה אפשר למחוק את המשאבים שיצרתם כדי להימנע מחיובים נוספים. מידע נוסף זמין בקטע הסרת המשאבים.

לפני שמתחילים

  1. בדף לבחירת הפרויקט במסוף Google Cloud , בוחרים פרויקט ב- Google Cloud או יוצרים אותו.

    תפקידים שנדרשים כדי לבחור או ליצור פרויקט

    • Select a project: כדי לבחור פרויקט לא צריך תפקיד IAM ספציפי – אפשר לבחור כל פרויקט שקיבלתם בו תפקיד.
    • יצירת פרויקט: כדי ליצור פרויקט, צריך את התפקיד Project Creator (יצירת פרויקטים) (roles/resourcemanager.projectCreator), שכולל את ההרשאה resourcemanager.projects.create. איך מקצים תפקידים

    כניסה לדף לבחירת הפרויקט

  2. מוודאים שהחיוב מופעל בפרויקט Google Cloud .

  3. מפעילים את ממשקי Cloud Build,‏ Cloud Run Functions,‏ ניהול הזהויות והרשאות הגישה, מנהל המשאבים ו-Workflows API.

    תפקידים שנדרשים להפעלת ממשקי API

    כדי להפעיל ממשקי API, צריך את תפקיד ה-IAM 'אדמין של Service Usage' (roles/serviceusage.serviceUsageAdmin), שכולל את ההרשאה serviceusage.services.enable. איך מקצים תפקידים

    הפעלת ממשקי ה-API

  4. עוברים לדף Welcome ורושמים את מזהה הפרויקט כדי להשתמש בו בשלב מאוחר יותר.

    כניסה לדף קבלת פנים

  5. במסוף Google Cloud , מפעילים את Cloud Shell.

    הפעלת Cloud Shell

הכנת הסביבה

כדי להכין את הסביבה, צריך ליצור מסד נתונים של Firestore, לשכפל את דוגמאות הקוד ממאגר GitHub, ליצור משאבים באמצעות Terraform, לערוך את קובץ ה-YAML של Workflows ולהתקין את הדרישות של מחולל הקבצים.

  1. כדי ליצור מסד נתונים של Firestore:

    1. נכנסים לדף Firestore במסוף Google Cloud .

      כניסה אל Firestore

    2. לוחצים על בחירת מצב מותאם.

    3. בתפריט Select a location, בוחרים את האזור שבו רוצים לארח את מסד הנתונים של Firestore. מומלץ לבחור אזור שקרוב למיקום הפיזי שלכם.

    4. לוחצים על יצירת מסד נתונים.

  2. ב-Cloud Shell, משכפלים את מאגר המקור:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. ב-Cloud Shell, יוצרים את המשאבים הבאים באמצעות Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    מחליפים את מה שכתוב בשדות הבאים:

    • PROJECT_ID: מזהה הפרויקט ב- Google Cloud
    • REGION: מיקום גיאוגרפי ספציפי לאירוח המשאבים, לדוגמה us-central1 Google Cloud
    • ZONE: מיקום באזור לאירוח המשאבים, לדוגמה us-central1-b

    אמורה להופיע הודעה דומה לזו: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    בעזרת Terraform אפשר ליצור, לשנות ולשדרג תשתית באופן בטוח וצפוי, ובקנה מידה גדול. המשאבים הבאים נוצרים בפרויקט:

    • חשבונות שירות עם ההרשאות הנדרשות כדי להבטיח גישה מאובטחת למשאבים.
    • מערך נתונים ב-BigQuery בשם serverless_elt_dataset וטבלה בשם word_count לטעינת הקבצים הנכנסים.
    • קטגוריה של Cloud Storage בשם ${project_id}-ordersbucket לאחסון זמני של קובצי קלט.
    • חמש פונקציות Cloud Run:
      • file_add_handler מוסיף את שמות הקבצים שנוספו לקטגוריה של Cloud Storage לאוסף Firestore.
      • create_job יוצר משימת טעינה חדשה ב-BigQuery ומשייך קבצים באוסף Firebase למשימה.
      • create_query יוצר משימת שאילתה חדשה ב-BigQuery.
      • poll_bigquery_job מקבל את הסטטוס של משימת BigQuery.
      • run_bigquery_job מתחילה משימת BigQuery.
  4. מקבלים את כתובות ה-URL של פונקציות Cloud Run‏ create_job, create_query, poll_job ו-run_bigquery_job שפרסתם בשלב הקודם.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    הפלט אמור להיראות כך:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    חשוב לרשום את כתובות האתרים האלה, כי תצטרכו אותן כשפריסת תהליך העבודה תסתיים.

יצירה ופריסה של תהליך עבודה

  1. ב-Cloud Shell, פותחים את קובץ המקור של זרימת העבודה, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    מחליפים את מה שכתוב בשדות הבאים:

    • CREATE_JOB_URL: כתובת ה-URL של הפונקציה ליצירת משימה חדשה
    • POLL_BIGQUERY_JOB_URL: כתובת ה-URL של הפונקציה לבדיקת הסטטוס של משימה שפועלת
    • RUN_BIGQUERY_JOB_URL: כתובת ה-URL של הפונקציה להפעלת משימת טעינה ב-BigQuery
    • CREATE_QUERY_URL: כתובת ה-URL של הפונקציה להפעלת משימת שאילתה ב-BigQuery
    • BQ_REGION: האזור ב-BigQuery שבו הנתונים מאוחסנים – למשל, US
    • BQ_DATASET_TABLE_NAME: שם הטבלה במערך הנתונים ב-BigQuery בפורמט PROJECT_ID.serverless_elt_dataset.word_count
  2. פורסים את הקובץ workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    מחליפים את מה שכתוב בשדות הבאים:

    • WORKFLOW_NAME: השם הייחודי של תהליך העבודה
    • WORKFLOW_REGION: האזור שבו תהליך העבודה נפרס. לדוגמה, us-central1
    • WORKFLOW_DESCRIPTION: תיאור תהליך העבודה
  3. יוצרים סביבה וירטואלית של Python 3 ומתקינים את הדרישות של מחולל הקבצים:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

יצירת קבצים לייבוא

gen.py סקריפט Python יוצר תוכן אקראי בפורמט Avro. הסכימה זהה לטבלת word_count ב-BigQuery. קבצי ה-Avro האלה מועתקים לקטגוריה שצוינה ב-Cloud Storage.

ב-Cloud Shell, יוצרים את הקבצים:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

מחליפים את מה שכתוב בשדות הבאים:

  • RECORDS_PER_FILE: מספר הרשומות בקובץ אחד
  • NUM_FILES: המספר הכולל של הקבצים שיועלו
  • FILE_PREFIX: הקידומת לשמות של הקבצים שנוצרו

הצגת רשומות של קבצים ב-Firestore

כשמעתיקים את הקבצים ל-Cloud Storage, מופעלת פונקציית Cloud Run‏ handle_new_file. הפונקציה הזו מוסיפה את רשימת הקבצים למערך רשימת הקבצים במסמך new באוסף jobs של Firestore.

כדי לראות את רשימת הקבצים, במסוף Google Cloud עוברים לדף Data של Firestore.

מעבר אל 'נתונים'

רשימת הקבצים שנוספו לאוסף.

הפעלת תהליך העבודה

‫Workflows מקשר בין סדרה של משימות ללא שרת מ-Google Cloud ומממשקי API. כל שלב בתהליך העבודה הזה מופעל כפונקציית Cloud Run, והמצב מאוחסן ב-Firestore. כל הקריאות לפונקציות Cloud Run מאומתות באמצעות חשבון השירות של זרימת העבודה.

ב-Cloud Shell, מריצים את זרימת העבודה:

gcloud workflows execute WORKFLOW_NAME

התרשים הבא מציג את השלבים בתהליך העבודה:

השלבים שמשמשים בתהליך העבודה הראשי ובתהליך העבודה המשני.

תהליך העבודה מחולק לשני חלקים: תהליך העבודה הראשי ותהליך העבודה המשני. תהליך העבודה הראשי מטפל ביצירת משימות ובהרצה מותנית, בעוד שתהליך העבודה המשני מריץ משימת BigQuery. תהליך העבודה מבצע את הפעולות הבאות:

  • הפונקציה create_job Cloud Run יוצרת אובייקט חדש של עבודה, מקבלת את רשימת הקבצים שנוספו ל-Cloud Storage ממסמך Firestore ומשייכת את הקבצים לעבודת הטעינה. אם אין קבצים לטעינה, הפונקציה לא יוצרת משימה חדשה.
  • פונקציית create_query Cloud Run מקבלת את השאילתה שצריך להריץ ואת האזור ב-BigQuery שבו השאילתה צריכה לפעול. הפונקציה יוצרת את המשימה ב-Firestore ומחזירה את מזהה המשימה.
  • פונקציית run_bigquery_job Cloud Run מקבלת את המזהה של המשימה שצריך להריץ, ואז מפעילה את BigQuery API כדי לשלוח את המשימה.
  • במקום לחכות לסיום העבודה בפונקציית Cloud Run, אפשר לדגום את סטטוס העבודה באופן תקופתי.
    • פונקציית poll_bigquery_job Cloud Run מספקת את הסטטוס של העבודה. הפונקציה נקראת שוב ושוב עד שהעבודה מסתיימת.
    • כדי להוסיף השהיה בין הקריאות לפונקציית poll_bigquery_jobCloud Run, קוראים לשגרה sleep מ-Workflows.

צפייה בסטטוס של המשימה

אפשר לראות את רשימת הקבצים ואת הסטטוס של העבודה.

  1. במסוףGoogle Cloud , עוברים לדף Data ב-Firestore.

    מעבר אל 'נתונים'

  2. מזהה ייחודי (UUID) נוצר לכל משימה. כדי להציג את job_type ואת status, לוחצים על מזהה המשימה. כל עבודה יכולה להיות מסוג מסוים ולכלול סטטוס מסוים מתוך האפשרויות הבאות:

    • job_type: סוג העבודה שמופעלת על ידי תהליך העבודה, עם אחד מהערכים הבאים:

      • ‫0: טעינת נתונים ל-BigQuery.
      • ‫1: מריצים שאילתה ב-BigQuery.
    • status: המצב הנוכחי של העבודה, עם אחד מהערכים הבאים:

      • ‫0: העבודה נוצרה, אבל לא הופעלה.
      • ‫1: העבודה פועלת.
      • ‫2: העבודה הושלמה בהצלחה.
      • ‫3: הייתה שגיאה והעבודה לא הושלמה בהצלחה.

    אובייקט המשימה מכיל גם מאפייני מטא-נתונים, כמו האזור של מערך הנתונים ב-BigQuery, השם של הטבלה ב-BigQuery, ואם מדובר במשימת שאילתה, מחרוזת השאילתה שמופעלת.

רשימת קבצים עם סטטוס העבודה שמודגש.

הצגת נתונים ב-BigQuery

כדי לוודא שמשימת ה-ELT בוצעה בהצלחה, בודקים שהנתונים מופיעים בטבלה.

  1. במסוף Google Cloud , עוברים לדף Editor ב-BigQuery.

    כניסה ל-Editor

  2. לוחצים על הטבלה serverless_elt_dataset.word_count.

  3. לוחצים על הכרטיסייה תצוגה מקדימה.

    כרטיסיית תצוגה מקדימה שבה מוצגים נתונים בטבלה.

תזמון תהליך העבודה

כדי להפעיל את תהליך העבודה באופן מחזורי לפי לוח זמנים, אפשר להשתמש ב-Cloud Scheduler.

הסרת המשאבים

הדרך הקלה ביותר לבטל את החיוב היא למחוק את Google Cloud הפרויקט שיצרתם בשביל המדריך. אפשר גם למחוק את המשאבים בנפרד.

מחיקת המשאבים הבודדים

  1. ב-Cloud Shell, מסירים את כל המשאבים שנוצרו באמצעות Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. נכנסים לדף Data של Firestore במסוף Google Cloud .

    מעבר אל 'נתונים'

  3. לצד משרות, לוחצים על תפריט ובוחרים באפשרות מחיקה.

    נתיב התפריט למחיקת אוסף.

מחיקת הפרויקט

  1. במסוף Google Cloud , נכנסים לדף Manage resources.

    כניסה לדף Manage resources

  2. ברשימת הפרויקטים, בוחרים את הפרויקט שרוצים למחוק ולוחצים על Delete.
  3. כדי למחוק את הפרויקט, כותבים את מזהה הפרויקט בתיבת הדו-שיח ולוחצים על Shut down.

המאמרים הבאים