使用 Workflows 执行 Cloud Run 作业

借助 Workflows,您可以在工作流中执行 Cloud Run 作业,以执行更复杂的数据处理或编排现有作业的系统。

本教程演示了如何使用 Workflows 执行 Cloud Run 作业,该作业会处理作为环境变量传递给作业的数据,以响应来自 Cloud Storage 的事件。

请注意,您还可以将事件数据存储在 Cloud Storage 存储桶中,这样您就可以使用客户管理的加密密钥来加密数据。 如需了解详情,请参阅执行 Cloud Run 作业以处理保存在 Cloud Storage 中的事件数据

创建 Cloud Run 作业

本教程使用 GitHub 中的示例 Cloud Run 作业。该作业从 Cloud Storage 中的输入文件读取数据,并对文件中的每一行执行一些任意处理。

  1. 通过将示例应用代码库克隆到本地机器来获取示例代码:

    git clone https://github.com/GoogleCloudPlatform/jobs-demos.git

    或者,您也可以下载该示例的 ZIP 文件并将其解压缩。

  2. 转到包含示例代码的目录:

    cd jobs-demos/parallel-processing
  3. 创建一个 Cloud Storage 存储桶,用于存储可写入并触发事件的输入文件:

    控制台

    1. 在 Google Cloud 控制台中,转到 Cloud Storage 存储桶页面。

      进入“存储桶”

    2. 依次点击 add(添加)Create(创建)。
    3. 创建存储桶页面上,输入存储桶的名称:
      input-PROJECT_ID
      PROJECT_ID 替换为您的 Google Cloud 项目的 ID。
    4. 保留其他默认设置。
    5. 点击创建

    gcloud

    运行 gcloud storage buckets create 命令:

    gcloud storage buckets create gs://input-PROJECT_ID

    如果请求成功,该命令将返回以下消息:

    Creating gs://input-PROJECT_ID/...

    Terraform

    如需创建 Cloud Storage 存储桶,请使用 google_storage_bucket 资源并修改 main.tf 文件,如以下示例所示。

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    请注意,在典型的 Terraform 工作流中,您会一次性应用整个计划。不过,在本教程中,您可以定位到特定资源。例如:

    terraform apply -target="random_id.bucket_name_suffix"

    terraform apply -target="google_storage_bucket.default"

    # Cloud Storage bucket names must be globally unique
    resource "random_id" "bucket_name_suffix" {
      byte_length = 4
    }
    
    # Create a Cloud Storage bucket
    resource "google_storage_bucket" "default" {
      name                        = "input-${data.google_project.project.name}-${random_id.bucket_name_suffix.hex}"
      location                    = "us-central1"
      storage_class               = "STANDARD"
      force_destroy               = false
      uniform_bucket_level_access = true
    }
  4. 创建一个 Artifact Registry 标准制品库,您可以在其中存储您的容器映像:

    控制台

    1. 在 Google Cloud 控制台中,前往 Artifact Registry 仓库页面:

      前往制品库

    2. 点击 创建代码库

    3. 输入代码库的名称,例如 my-repo。对于项目中的每个代码库位置,代码库名称不得重复。

    4. 保留默认格式,即 Docker

    5. 保留默认模式,该模式应为标准

    6. 对于区域,选择 us-central1(爱荷华)

    7. 保留所有其他默认设置。

    8. 点击创建

    gcloud

    运行以下命令:

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=us-central1

    REPOSITORY 替换为制品库的唯一名称,例如 my-repo。对于项目中的每个代码库位置,代码库名称不得重复。

    Terraform

    如需创建 Artifact Registry 代码库,请使用 google_artifact_registry_repository 资源并修改 main.tf 文件,如以下示例所示。

    请注意,在典型的 Terraform 工作流中,您会一次性应用整个方案。不过,在本教程中,您可以定位特定资源。例如:

    terraform apply -target="google_artifact_registry_repository.default"

    # Create an Artifact Registry repository
    resource "google_artifact_registry_repository" "default" {
      location      = "us-central1"
      repository_id = "my-repo"
      format        = "docker"
    }
  5. 使用默认 Google Cloud buildpack 构建容器映像:

    export SERVICE_NAME=parallel-job
    gcloud builds submit \
        --pack image=us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/${SERVICE_NAME}

    REPOSITORY 替换为您的 Artifact Registry 制品库的名称。

    构建可能需要几分钟才能完成。

  6. 创建用于部署容器映像的 Cloud Run 作业:

    控制台

    1. 在 Google Cloud 控制台中,前往 Cloud Run 页面:

      转到 Cloud Run

    2. 点击创建作业,以显示“创建作业”表单。

      1. 在该表单中,选择 us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job:latest 作为 Artifact Registry 容器映像网址。
      2. 可选:对于作业名称,输入 parallel-job
      3. 可选:对于区域,选择 us-central1(爱荷华)
      4. 对于要在作业中运行的任务数量,请输入 10。所有任务都必须成功完成,这样作业才能成功完成。默认情况下,任务是并行执行的。
    3. 展开容器、变量和密钥、连接、安全性部分,保留所有默认设置,但以下设置除外:

      1. 点击常规标签页。

        1. 对于容器命令,请输入 python
        2. 对于容器实参,请输入 process.py
      2. 点击变量和 Secret 标签页。

        1. 点击添加变量,然后输入 INPUT_BUCKET 作为名称,并输入 input-PROJECT_ID 作为值。
        2. 点击添加变量,然后输入 INPUT_FILE 作为名称,并输入 input_file.txt 作为值。
    4. 如需创建作业,请点击创建

    gcloud

    1. 设置默认 Cloud Run 区域:

      gcloud config set run/region us-central1
    2. 创建 Cloud Run 作业:

      gcloud run jobs create parallel-job \
          --image us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job:latest \
          --command python \
          --args process.py \
          --tasks 10 \
          --set-env-vars=INPUT_BUCKET=input-PROJECT_ID,INPUT_FILE=input_file.txt

      请注意,如果您未指定映像标记,Artifact Registry 将查找带有默认 latest 标记的映像。

      如需查看创建作业时可用的选项的完整列表,请参阅 gcloud run jobs create 命令行文档。

      作业创建完成后,您应该会看到一条指示成功的消息。

    Terraform

    如需创建 Cloud Run 作业,请使用 google_cloud_run_v2_job 资源并修改 main.tf 文件,如以下示例所示。

    请注意,在典型的 Terraform 工作流中,您会一次性应用整个方案。不过,在本教程中,您可以定位特定资源。例如:

    terraform apply -target="google_cloud_run_v2_job.default"

    # Create a Cloud Run job
    resource "google_cloud_run_v2_job" "default" {
      name     = "parallel-job"
      location = "us-central1"
    
      template {
        task_count = 10
        template {
          containers {
            image   = "us-central1-docker.pkg.dev/${data.google_project.project.name}/${google_artifact_registry_repository.default.repository_id}/parallel-job:latest"
            command = ["python"]
            args    = ["process.py"]
            env {
              name  = "INPUT_BUCKET"
              value = google_storage_bucket.default.name
            }
            env {
              name  = "INPUT_FILE"
              value = "input_file.txt"
            }
          }
        }
      }
    }

部署用于执行 Cloud Run 作业的工作流

定义并部署一个工作流,用于执行您刚刚创建的 Cloud Run 作业。工作流定义由一系列使用 Workflows 语法描述的步骤组成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称,例如 cloud-run-job-workflow

  4. 对于区域,选择 us-central1(爱荷华)

  5. 服务账号字段中,选择您之前创建的服务账号。

    该服务账号充当工作流的身份。您应该已向服务账号授予 Cloud Run Admin 角色,以便工作流可以执行 Cloud Run 作业。

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的以下定义:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch cloud-run-job-workflow.yaml
  2. 将以下工作流定义复制到您的源代码文件中:

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 输入以下命令以部署工作流:

    gcloud workflows deploy cloud-run-job-workflow \
        --location=us-central1 \
        --source=cloud-run-job-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    替换以下内容:

    • SERVICE_ACCOUNT_NAME:您之前创建的服务账号的名称
    • PROJECT_ID:您的Google Cloud 项目的 ID

    该服务账号充当工作流的身份。您应该已向服务账号授予 roles/run.admin 角色,以便工作流可以执行 Cloud Run 作业。

Terraform

如需创建工作流,请使用 google_workflows_workflow 资源并修改 main.tf 文件,如以下示例所示。

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

请注意,在典型的 Terraform 工作流中,您会一次性应用整个方案。不过,在本教程中,您可以定位特定资源。例如:

terraform apply -target="google_workflows_workflow.default"

# Create a workflow
resource "google_workflows_workflow" "default" {
  name        = "cloud-run-job-workflow"
  region      = "us-central1"
  description = "Workflow that routes a Cloud Storage event and executes a Cloud Run job"

  deletion_protection = false # set to "true" in production

  # Note that $$ is needed for Terraform
  source_contents = <<EOF
  main:
      params: [event]
      steps:
          - init:
              assign:
                  - project_id: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                  - event_bucket: $${event.data.bucket}
                  - event_file: $${event.data.name}
                  - target_bucket: "${google_storage_bucket.default.name}"
                  - job_name: parallel-job
                  - job_location: us-central1
          - check_input_file:
              switch:
                  - condition: $${event_bucket == target_bucket}
                    next: run_job
                  - condition: true
                    next: end
          - run_job:
              call: googleapis.run.v1.namespaces.jobs.run
              args:
                  name: $${"namespaces/" + project_id + "/jobs/" + job_name}
                  location: $${job_location}
                  body:
                      overrides:
                          containerOverrides:
                              env:
                                  - name: INPUT_BUCKET
                                    value: $${event_bucket}
                                  - name: INPUT_FILE
                                    value: $${event_file}
              result: job_execution
          - finish:
              return: $${job_execution}
  EOF
}

该工作流会执行以下操作:

  1. init 步 - 接受 Cloud Storage 事件作为实参,然后设置必要的变量。

  2. check_input_file 步骤 - 检查事件中指定的 Cloud Storage 存储桶是否为 Cloud Run 作业使用的存储桶。

    • 如果为“是”,工作流将继续执行 run_job 步骤。
    • 如果否,工作流会终止,停止任何进一步的处理。
  3. run_job 步骤 - 使用 Cloud Run Admin API 连接器的 googleapis.run.v1.namespaces.jobs.run 方法来执行作业。Cloud Storage 存储桶和数据文件名会作为替换变量从工作流传递到作业。

  4. finish 步骤 - 返回有关作业执行的信息,作为工作流的结果。

为工作流创建 Eventarc 触发器

如需在输入数据文件更新时自动执行工作流,进而执行 Cloud Run 作业,请创建一个 Eventarc 触发器,以响应包含输入数据文件的存储桶中的 Cloud Storage 事件。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 点击工作流的名称,例如 cloud-run-job-workflow

  3. 工作流详细信息页面上,点击 修改

  4. 修改工作流页面上的触发器部分中,点击添加新触发器 > Eventarc

    Eventarc 触发器窗格随即会打开。

  5. 触发器名称字段中,输入触发器的名称,例如 cloud-run-job-workflow-trigger

  6. 事件提供方列表中,选择 Cloud Storage

  7. 事件列表中,选择 google.cloud.storage.object.v1.finalized

  8. 存储桶字段中,选择包含输入数据文件的存储桶。存储桶名称的格式为 input-PROJECT_ID

  9. 服务账号字段中,选择您之前创建的服务账号。

    该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:

    • Eventarc Event Receiver:用于接收事件
    • Workflows Invoker:用于执行工作流
  10. 点击保存触发器

    Eventarc 触发器现在显示在修改工作流页面上的触发器部分中。

  11. 点击下一步

  12. 点击部署

gcloud

运行以下命令,创建 Eventarc 触发器:

gcloud eventarc triggers create cloud-run-job-workflow-trigger \
    --location=us \
    --destination-workflow=cloud-run-job-workflow  \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=input-PROJECT_ID" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

替换以下内容:

  • PROJECT_ID:您的 Google Cloud 项目的 ID
  • SERVICE_ACCOUNT_NAME:您之前创建的服务账号的名称。

该服务账号充当触发器的身份。您应该已向服务账号授予以下角色:

  • roles/eventarc.eventReceiver:接收事件
  • roles/workflows.invoker:执行工作流

Terraform

如需创建触发器,请使用 google_eventarc_trigger 资源并修改 main.tf 文件,如以下示例所示。

如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

请注意,在典型的 Terraform 工作流中,您会一次性应用整个方案。不过,在本教程中,您可以定位特定资源。例如:

terraform apply -target="google_eventarc_trigger.default"

# Create an Eventarc trigger that routes Cloud Storage events to Workflows
resource "google_eventarc_trigger" "default" {
  name     = "cloud-run-job-trigger"
  location = google_workflows_workflow.default.region

  # Capture objects changed in the bucket
  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }
  matching_criteria {
    attribute = "bucket"
    value     = google_storage_bucket.default.name
  }

  # Send events to Workflows
  destination {
    workflow = google_workflows_workflow.default.id
  }

  service_account = google_service_account.workflows.email

}

每当包含输入数据文件的 Cloud Storage 存储桶中有文件上传或被覆盖时,系统都会执行工作流,并将相应的 Cloud Storage 事件作为实参。

触发工作流

通过更新 Cloud Storage 中的输入数据文件来测试端到端系统。

  1. 为输入文件生成新数据,并将其上传到 Cloud Storage 中 Cloud Run 作业预期会找到的位置:

    base64 /dev/urandom | head -c 100000 >input_file.txt
    gcloud storage cp input_file.txt gs://input-PROJECT_ID/input_file.txt

    如果您使用 Terraform 创建了 Cloud Storage 存储桶,可以运行以下命令来检索该存储桶的名称:

    gcloud storage buckets list gs://input*

    Cloud Run 作业可能需要几分钟才能运行完毕。

  2. 查看作业执行情况,确认 Cloud Run 作业是否按预期运行:

    gcloud config set run/region us-central1
    gcloud run jobs executions list --job=parallel-job

    您应该会在输出中看到作业执行成功,表明 10/10 个任务已完成。

详细了解如何使用事件或 Pub/Sub 消息触发工作流