使用 Workflows 将数据从 Cloud Storage 加载到 BigQuery

本教程介绍如何使用 WorkflowsCloud Run 函数Firestore,将原始数据(例如事件日志)从 Cloud Storage 加载到 BigQuery。Analytics 平台通常具有一个编排工具,用于定期使用 BigQuery 作业在 BigQuery 中加载数据,然后使用 SQL 语句来转换数据(包括 BigQuery 过程语言语句)以提供业务指标。本教程适用于想要构建无服务器、事件驱动型数据处理流水线的开发者和架构师。本教程假定您熟悉 YAML、SQL 和 Python。

架构

下图显示了使用 Workflows的无服务器提取、加载和转换 (ELT) 流水线的概要架构。

提取、加载和转换流水线。

在上图中,假设一个零售平台定期从各个商店以文件形式收集销售事件,然后将文件写入 Cloud Storage 存储分区。事件用于通过在 BigQuery 中导入和处理来提供业务指标。此架构提供了一个用于将文件导入到 BigQuery 中的可靠、无服务器的编排系统,它分为以下两个模块:

  • 文件列表:维护已添加到 Cloud Storage 存储分区内 Firestore 集合中的未处理文件的列表。此模块通过 Cloud Run 函数工作,该函数通过对象完成存储事件触发,而该事件在将新文件添加到 Cloud Storage 存储桶时生成。文件名会附加到 Firestore 中名为 new 的集合 files 数组中。
  • 工作流:运行计划的工作流。Cloud Scheduler 会触发一个工作流,它根据基于 YAML 的语法运行一系列步骤来编排加载,然后通过调用 Cloud Run 函数来转换 BigQuery 中的数据。工作流中的步骤调用 Cloud Run functions 函数以运行以下任务:

    • 创建并启动 BigQuery 加载作业。
    • 轮询加载作业状态。
    • 创建并启动转换查询作业。
    • 轮询转换作业状态。

通过使用事务来维护 Firestore 中的新文件列表,可帮助确保在工作流将这些文件导入 BigQuery 中时不会丢失任何文件。通过将作业元数据和状态存储在 Firestore 中,可以使得工作流的单独运行具有幂等性。

准备环境

若要准备环境,请创建 Firestore 数据库,从 GitHub 代码库克隆代码示例,使用 Terraform 创建资源,修改工作流 YAML 文件,并安装文件生成器的要求。

  1. 如需创建 Firestore 数据库,请执行以下操作:

    1. 在 Google Cloud 控制台中,前往 Firestore 页面。

      转到 Firestore

    2. 点击选择原生模式

    3. 选择位置菜单中,选择要托管 Firestore 数据库的区域。我们建议选择靠近您的物理位置的区域。

    4. 点击创建数据库

  2. 在 Cloud Shell 中,克隆源代码库:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. 在 Cloud Shell 中,使用 Terraform 创建以下资源:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • REGION:用于托管资源的特定 Google Cloud地理位置,例如 us-central1
    • ZONE:用于托管资源的区域内的位置,例如 us-central1-b

    您应该会看到如下所示的消息: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform 可帮助您以可预测的方式安全地大规模创建、更改和升级基础架构。系统会在您的项目中创建以下资源:

    • 具有必需权限的服务账号,用于确保安全访问您的资源。
    • 名为 serverless_elt_dataset 的 BigQuery 数据集和名为 word_count 的表,用于加载传入的文件。
    • 名为 ${project_id}-ordersbucket 的 Cloud Storage 存储分区,用于暂存输入文件。
    • 以下五个 Cloud Run 函数:
      • file_add_handler 用于将添加到 Cloud Storage 存储分区的文件的名称添加到 Firestore 集合。
      • create_job 用于创建一个新的 BigQuery 加载作业,并将 Firebase 集合中的文件与该作业相关联。
      • create_query 用于创建一个新的 BigQuery 查询作业。
      • poll_bigquery_job 用于获取 BigQuery 作业的状态。
      • run_bigquery_job 用于启动 BigQuery 作业。
  4. 获取您在上一步中部署的 create_jobcreate_querypoll_jobrun_bigquery_job Cloud Run functions 函数的网址。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    输出类似于以下内容:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    请记下这些网址,因为在部署工作流时需要用到它们。

创建和部署工作流

  1. 在 Cloud Shell 中,打开工作流的源文件 workflow.yaml

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    替换以下内容:

    • CREATE_JOB_URL:用于创建新作业的函数的网址
    • POLL_BIGQUERY_JOB_URL:用于轮询运行中作业状态的函数的网址
    • RUN_BIGQUERY_JOB_URL:用于启动 BigQuery 加载作业的函数的网址
    • CREATE_QUERY_URL:用于启动 BigQuery 查询作业的函数的网址
    • BQ_REGION:存储数据的 BigQuery 区域,例如 US
    • BQ_DATASET_TABLE_NAME:BigQuery 数据集表名称,格式为 PROJECT_ID.serverless_elt_dataset.word_count
  2. 部署 workflow 文件:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    替换以下内容:

    • WORKFLOW_NAME:工作流的唯一名称
    • WORKFLOW_REGION:在其中部署此工作流的区域,例如 us-central1
    • WORKFLOW_DESCRIPTION:工作流的说明
  3. 创建 Python 3 虚拟环境并安装文件生成器的必备项目:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

生成要导入的文件

gen.py Python 脚本会生成 Avro 格式的随机内容。架构与 BigQuery word_count 表相同。这些 Avro 文件会复制到指定的 Cloud Storage 存储桶中。

在 Cloud Shell 中,生成以下文件:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

替换以下内容:

  • RECORDS_PER_FILE:单个文件中的记录数
  • NUM_FILES:要上传的文件总数
  • FILE_PREFIX:已生成文件的名称的前缀

查看 Firestore 中的文件条目

将文件复制到 Cloud Storage 时,会触发 handle_new_file Cloud Run 函数。该函数会将文件列表添加到 Firestore jobs 集合的 new 文档的文件列表数组中。

如需查看文件列表,请在 Google Cloud 控制台中前往 Firestore 数据页面。

前往“数据”页面

已添加到集合的文件的列表。

触发工作流

Workflows 将一系列无服务器任务与Google Cloud 和 API 服务相关联。此工作流中的各个步骤以 Cloud Run 函数的形式运行,并且状态存储在 Firestore 中。所有对 Cloud Run functions 的调用都使用工作流的服务账号进行身份验证。

在 Cloud Shell 中,运行工作流:

gcloud workflows execute WORKFLOW_NAME

下图显示了在工作流中使用的步骤:

在主工作流和子工作流中使用的步骤。

工作流分为两个部分:主工作流和子工作流。主工作流处理作业创建和有条件执行,子工作流执行 BigQuery 作业。工作流会执行以下操作:

  • create_job Cloud Run 函数会创建新的作业对象、获取从 Firestore 文档添加到 Cloud Storage 的文件列表并将这些文件与加载作业相关联。如果没有要加载的文件,函数将不会创建新作业。
  • create_query Cloud Run 函数接受这样的查询:需要与应在其中执行查询的 BigQuery 区域一起执行的查询。该函数在 Firestore 中创建作业并返回作业 ID。
  • run_bigquery_job Cloud Run 函数获取需要执行的作业的 ID,然后调用 BigQuery API 来提交作业。
  • 您可以定期轮询作业状态,而不是等待 Cloud Run 函数中的作业完成。
    • poll_bigquery_job Cloud Run 函数提供作业的状态。系统会重复调用该函数,直到作业完成。
    • 如需在对 poll_bigquery_job Cloud Run 函数的调用之间添加延迟,请从 Workflows 中调用 sleep 例程

查看作业状态

您可以查看文件列表和作业状态。

  1. 在Google Cloud 控制台中,前往 Firestore 数据页面。

    前往“数据”页面

  2. 系统会为每个作业生成一个唯一标识符 (UUID)。如需查看 job_typestatus,请点击作业 ID。每个作业可能具有以下类型之一和状态:

    • job_type:工作流正在运行的作业的类型,值为以下值之一:

      • 0:将数据加载到 BigQuery 中。
      • 1:在 BigQuery 中运行查询。
    • status:作业的当前状态,值为以下值之一:

      • 0:作业已创建,但尚未开始。
      • 1:作业正在运行。
      • 2:作业已成功完成其执行。
      • 3:出现错误,作业未成功完成。

    作业对象还包含元数据属性,例如 BigQuery 数据集所在的区域、BigQuery 表的名称以及正在运行的查询字符串(如果作业是查询作业的话)。

突出显示了作业状态的文件列表。

在 BigQuery 中查看数据

如需确认 ELT 作业是否成功,请验证数据是否显示在表中。

  1. 在 Google Cloud 控制台中,前往 BigQuery 编辑器页面。

    前往编辑器

  2. 点击 serverless_elt_dataset.word_count 表。

  3. 点击预览标签页。

    在表格中显示数据的“预览”标签页。

安排工作流

如需定期按计划运行工作流,您可以使用 Cloud Scheduler