使用工作流程將資料從 Cloud Storage 載入至 BigQuery

本教學課程說明如何使用 WorkflowsCloud Run functionsFirestore,以無伺服器工作流程可靠地從 Cloud Storage 將原始資料 (例如事件記錄) 載入 BigQuery。 分析平台通常會提供協調工具,定期使用 BigQuery 工作將資料載入 BigQuery,然後使用 SQL 陳述式 (包括 BigQuery 程序語言陳述式) 轉換資料,以提供業務指標。本教學課程適用於想建構無伺服器事件導向資料處理管道的開發人員和架構師。本教學課程假設您熟悉 YAML、SQL 和 Python。

架構

下圖顯示無伺服器擷取、載入及轉換 (ELT) 管道的整體架構,該管道使用 Workflows

擷取、載入及轉換 pipeline。

在上圖中,假設零售平台會定期從各家商店收集銷售事件做為檔案,然後將檔案寫入 Cloud Storage bucket。這些事件會匯入 BigQuery 並經過處理,以提供業務指標。這個架構提供可靠的無伺服器協調系統,可將檔案匯入 BigQuery,並分為下列兩個模組:

  • 檔案清單:維護 Firestore 集合中新增至 Cloud Storage 值區的未處理檔案清單。這個模組會透過 Cloud Run 函式運作,並由「物件完成」儲存空間事件觸發,當新檔案新增至 Cloud Storage bucket 時,就會產生這個事件。檔案名稱會附加至 Firestore 中名為 new 的集合陣列。files
  • 工作流程:執行排定的工作流程。Cloud Scheduler 會觸發工作流程,根據 YAML 語法執行一系列步驟,協調載入作業,然後呼叫 Cloud Run 函式,轉換 BigQuery 中的資料。工作流程中的步驟會呼叫 Cloud Run functions,執行下列工作:

    • 建立並啟動 BigQuery 載入工作。
    • 輪詢載入工作狀態。
    • 建立並啟動轉換查詢工作。
    • 輪詢轉換工作狀態。

使用交易在 Firestore 中維護新檔案清單,有助於確保工作流程將檔案匯入 BigQuery 時不會遺漏任何檔案。工作流程的個別執行作業會將工作的中繼資料和狀態儲存在 Firestore 中,藉此確保冪等性。

準備環境

如要準備環境,請建立 Firestore 資料庫、從 GitHub 存放區複製程式碼範例、使用 Terraform 建立資源、編輯 Workflows YAML 檔案,以及安裝檔案產生器的必要條件。

  1. 如要建立 Firestore 資料庫,請按照下列步驟操作:

    1. 前往 Google Cloud 控制台的 Firestore 頁面。

      前往 Firestore

    2. 按一下「選取原生模式」

    3. 在「選取位置」選單中,選取要代管 Firestore 資料庫的區域。建議選擇靠近您實際位置的區域。

    4. 按一下 [Create database] (建立資料庫)。

  2. 在 Cloud Shell 中,複製來源存放區:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. 在 Cloud Shell 中,使用 Terraform 建立下列資源:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    更改下列內容:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • REGION:用於代管資源的特定 Google Cloud 地理位置,例如 us-central1
    • ZONE:區域內用於代管資源的位置,例如 us-central1-b

    畫面會顯示類似以下的訊息: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform 可協助您以安全且可預測的方式,大規模建立、變更及升級基礎架構。專案中會建立下列資源:

    • 服務帳戶:具備必要權限,確保資源存取安全無虞。
    • 名為 serverless_elt_dataset 的 BigQuery 資料集和名為 word_count 的資料表,用於載入傳入的檔案。
    • 名為 ${project_id}-ordersbucket 的 Cloud Storage bucket,用於暫存輸入檔案。
    • 下列五個 Cloud Run 函式:
      • file_add_handler 會將新增至 Cloud Storage bucket 的檔案名稱新增至 Firestore 集合。
      • create_job 會建立新的 BigQuery 載入工作,並將 Firebase 集合中的檔案與該工作建立關聯。
      • create_query 會建立新的 BigQuery 查詢工作。
      • poll_bigquery_job 會取得 BigQuery 工作的狀態。
      • run_bigquery_job 會啟動 BigQuery 工作。
  4. 取得您在上一個步驟中部署的 create_jobcreate_querypoll_jobrun_bigquery_job Cloud Run 函式網址。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    輸出結果會與下列內容相似:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    請記下這些網址,部署工作流程時會用到。

建立及部署工作流程

  1. 在 Cloud Shell 中開啟工作流程的來源檔案: workflow.yaml

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    更改下列內容:

    • CREATE_JOB_URL:用於建立新工作的函式網址
    • POLL_BIGQUERY_JOB_URL:輪詢執行中作業狀態的函式網址
    • RUN_BIGQUERY_JOB_URL:啟動 BigQuery 載入工作的函式網址
    • CREATE_QUERY_URL:啟動 BigQuery 查詢工作的函式網址
    • BQ_REGION:資料儲存的 BigQuery 區域,例如 US
    • BQ_DATASET_TABLE_NAME:BigQuery 資料集資料表名稱,格式為 PROJECT_ID.serverless_elt_dataset.word_count
  2. 部署 workflow 檔案:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    更改下列內容:

    • WORKFLOW_NAME:工作流程的專屬名稱
    • WORKFLOW_REGION:工作流程部署所在的區域,例如 us-central1
    • WORKFLOW_DESCRIPTION:工作流程的說明
  3. 建立 Python 3 虛擬環境,並安裝檔案產生器的必要條件:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

產生要匯入的檔案

gen.py Python 指令碼會以 Avro 格式產生隨機內容。結構定義與 BigQuery word_count 資料表相同。這些 Avro 檔案會複製到指定的 Cloud Storage bucket。

在 Cloud Shell 中產生檔案:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

更改下列內容:

  • RECORDS_PER_FILE:單一檔案中的記錄數
  • NUM_FILES:要上傳的檔案總數
  • FILE_PREFIX:產生檔案名稱的前置字串

在 Firestore 中查看檔案項目

檔案複製到 Cloud Storage 時,會觸發 handle_new_file Cloud Run 函式。這個函式會將檔案清單新增至 Firestore jobs 集合中 new 文件內的檔案清單陣列。

如要查看檔案清單,請前往 Google Cloud 控制台的 Firestore「資料」頁面。

前往「資料」

加入集合的檔案清單。

觸發工作流程

Workflows 會將一系列無伺服器工作串連在一起,這些工作來自Google Cloud 和 API 服務。這項工作流程中的個別步驟會以 Cloud Run 函式執行,狀態則儲存在 Firestore 中。所有對 Cloud Run 函式的呼叫都會使用工作流程的服務帳戶進行驗證。

在 Cloud Shell 中執行工作流程:

gcloud workflows execute WORKFLOW_NAME

下圖顯示工作流程中使用的步驟:

主要和子工作流程中使用的步驟。

工作流程分為兩部分:主要工作流程和子工作流程。 主要工作流程會處理工作建立和條件式執行作業,而子工作流程則會執行 BigQuery 工作。工作流程會執行下列作業:

  • create_job Cloud Run 函式會建立新的工作物件、從 Firestore 文件取得新增至 Cloud Storage 的檔案清單,並將檔案與載入工作建立關聯。如果沒有要載入的檔案,函式就不會建立新工作。
  • create_query Cloud Run 函式會接收需要執行的查詢,以及查詢應執行的 BigQuery 區域。函式會在 Firestore 中建立工作,並傳回工作 ID。
  • run_bigquery_job Cloud Run 函式會取得需要執行的工作 ID,然後呼叫 BigQuery API 提交工作。
  • 您不必等待 Cloud Run 函式完成工作,可以定期輪詢工作狀態。
    • poll_bigquery_job Cloud Run 函式會提供工作的狀態。系統會重複呼叫這個函式,直到工作完成為止。
    • 如要在呼叫 poll_bigquery_job Cloud Run 函式之間加入延遲時間,請從 Workflows 呼叫 sleep 常式

查看工作狀態

您可以查看檔案清單和工作狀態。

  1. 前往Google Cloud 控制台的 Firestore「資料」頁面。

    前往「資料」

  2. 系統會為每項工作產生專屬 ID (UUID)。如要查看 job_typestatus,請按一下工作 ID。每項工作可能會有下列其中一種類型和狀態:

    • job_type:工作流程執行的工作類型,可使用下列其中一個值:

      • 0:將資料載入 BigQuery。
      • 1:在 BigQuery 中執行查詢。
    • status:工作目前的狀態,可能的值如下:

      • 0:工作已建立,但尚未啟動。
      • 1:工作正在執行。
      • 2:工作已順利執行完畢。
      • 3:發生錯誤,工作未順利完成。

    工作物件也包含中繼資料屬性,例如 BigQuery 資料集所屬的區域、BigQuery 資料表的名稱,以及 (如果是查詢工作) 執行的查詢字串。

檔案清單,並醒目顯示工作狀態。

查看 BigQuery 中的資料

如要確認 ELT 工作是否成功,請確認資料是否顯示在資料表中。

  1. 前往 Google Cloud 控制台的「BigQuery 編輯器」頁面。

    前往編輯器

  2. 按一下「serverless_elt_dataset.word_count」資料表。

  3. 按一下「預覽」分頁標籤。

    「預覽」分頁,顯示表格中的資料。

排定工作流程

如要依排程定期執行工作流程,可以使用 Cloud Scheduler