使用 Cloud Tasks 队列缓冲工作流执行

本教程介绍如何创建可调节工作流执行速率的 Cloud Tasks 队列。

可以同时进行的工作流执行次数存在上限。一旦此配额用尽,并且如果执行积压处于停用状态,或者如果积压的执行作业达到配额上限,则任何新的执行作业都会失败,并返回 HTTP 429 Too many requests 状态代码。通过使 Cloud Tasks 队列能够以您定义的速率执行子工作流,您可以避免与 Workflows 配额相关的问题,并实现更高的执行速率。

请注意,Cloud Tasks 旨在提供“至少一次”提交;不过,对于来自 Cloud Tasks 的重复请求,Workflows 无法确保只处理一次。

在下图中,父工作流会调用受应用了调度率的 Cloud Tasks 队列监管的子工作流。

父工作流通过 Cloud Tasks 队列调用子工作流的迭代

创建 Cloud Tasks 队列

创建可在父工作流中使用的 Cloud Tasks 队列,并允许您控制工作流执行速率。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击 创建推送队列

  3. 输入队列名称,即 queue-workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 点击创建

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

创建和部署子工作流

子工作流可以接收和处理来自父工作流的数据。创建并部署一个执行以下操作的子工作流:

  • 接收 iteration 作为实参
  • 休眠 10 秒以模拟某些处理
  • 成功执行后返回字符串

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面。

    进入 Workflows

  2. 点击 创建

  3. 输入新工作流的名称 workflow-child

  4. 区域列表中,选择 us-central1(爱荷华)

  5. 服务账号列表中,选择 Compute Engine 默认服务账号

  6. 点击下一步

  7. 在工作流编辑器中,输入工作流的定义:

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. 点击部署

gcloud

  1. 为工作流创建源代码文件:

    touch workflow-child.yaml
  2. 在文本编辑器中打开源代码文件,然后将以下工作流复制到该文件中。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. 部署工作流:

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

创建和部署父工作流

父工作流使用 for 循环执行子工作流的多个分支。

  1. 复制定义父工作流的源代码:

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    工作流包括以下部分:

    • 用于分配引用子工作流和 Cloud Tasks 队列名称的常量的映射。如需了解详情,请参阅地图

    • 一种 for 循环,用于以迭代方式调用子工作流。 如需了解详情,请参阅迭代

    • 一种工作流步骤,用于创建大量任务并将其添加到 Cloud Tasks 队列中,以执行子工作流。如需了解详情,请参阅 Cloud Tasks API 连接器

  2. 部署工作流:

    控制台

    1. 在 Google Cloud 控制台中,前往 Workflows 页面:

      进入 Workflows

    2. 点击 创建

    3. 输入新工作流的名称 workflow-parent

    4. 区域列表中,选择 us-central1(爱荷华)

    5. 服务账号列表中,选择 Compute Engine 默认服务账号

    6. 点击下一步

    7. 在工作流编辑器中,粘贴父工作流的定义。

    8. 点击部署

    gcloud

    1. 为工作流创建源代码文件:

      touch workflow-parent.yaml
    2. 在文本编辑器中打开源代码文件,然后粘贴父工作流的定义。

    3. 部署工作流:

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

执行无速率限制的父工作流

执行父工作流,以通过 Cloud Tasks 队列调用子工作流。执行大约需要 10 秒才能完成。

控制台

  1. 在 Google Cloud 控制台中,前往 Workflows 页面:

    进入 Workflows

  2. 工作流页面上,点击 workflow-parent 工作流以转到其详情页面。

  3. 工作流详情页面上,点击 执行

  4. 再次点击执行

  5. 在父工作流运行时,返回到工作流页面,然后点击 workflow-child 工作流以转到其详情页面。

  6. 点击执行标签页。

    您应该会看到子工作流的执行情况,这些执行大约在同一时间运行,类似于以下内容:

    同时运行的子工作流执行的详细信息。

gcloud

  1. 执行工作流:

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    由于执行次数 (100) 低于 Workflows 并发限制,因此结果应与以下内容类似。如果您同时提交数千次执行,可能会出现配额问题。

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

您已创建并部署了一个工作流,该工作流会调用子工作流 100 次。

执行具有速率限制的父工作流

对 Cloud Tasks 队列应用每秒一次的调度速率限制,然后执行父工作流。

控制台

  1. 在 Google Cloud 控制台中,前往 Cloud Tasks 页面:

    转至 Cloud Tasks

  2. 点击您创建的 Cloud Tasks 队列 queue-workflow-child,然后点击修改队列

  3. 任务分派速率限制部分中,为最大分派数字段输入 1

  4. 点击保存

  5. 前往 Workflows 页面:

    进入 Workflows

  6. 点击 workflow-parent 工作流以转到其详情页面。

  7. 工作流详情页面上,点击 执行

  8. 再次点击执行

  9. 在父工作流运行时,返回到工作流页面,然后点击 workflow-child 工作流以转到其详情页面。

  10. 点击执行标签页。

    您应该会看到子工作流的执行情况,以每秒一次请求的速度运行,如下所示:

    以每秒一次的频率执行的子工作流的详细信息。

gcloud

  1. 更新 Cloud Tasks 队列,以应用每秒一次调度的速率限制:

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. 执行工作流:

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. 如需验证工作流执行是否已触发,请列出最后四项执行:

    gcloud workflows executions list workflow-child --limit=4

    结果应类似于以下内容,每秒执行一次工作流:

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

您已成功部署一个工作流,该工作流以每秒一次的调度速率调用子工作流 100 次迭代。