Cloud Tasks キューを使用してワークフローの実行をバッファリングする

このチュートリアルでは、ワークフローの実行レートを規制できる Cloud Tasks キューを作成する方法について説明します。

同時に実行できるアクティブなワークフロー実行には最大数があります。この割り当てが使い果たされ、実行バックログが無効になっている場合、またはバックログ実行の割り当てに達した場合、新しい実行は HTTP 429 Too many requests ステータス コードで失敗します。Cloud Tasks キューで定義したレートで子ワークフローを実行できるようにすることで、Workflows の割り当て関連の問題を回避し、実行速度を向上させることが可能です。

Cloud Tasks は、「少なくとも 1 回」を基本に処理を行うように設計されています。ただし、Workflows は Cloud Tasks からの重複リクエストの 1 回限りの処理が保証されません。

次の図では、親ワークフローが、ディスパッチ レートが適用された Cloud Tasks キューによって規制される子ワークフローを呼び出しています。

Cloud Tasks キューを介して子ワークフローのイテレーションを呼び出す親ワークフロー

Cloud Tasks キューを作成する

親ワークフローで使用でき、ワークフローの実行レートを規制できる Cloud Tasks キューを作成します。

コンソール

  1. Google Cloud コンソールで、[Cloud Tasks] ページに移動します。

    Cloud Tasks に移動

  2. [push キューを作成] をクリックします。

  3. [キュー名] に「queue-workflow-child」と入力します。

  4. [リージョン] リストで [us-central1 (Iowa)] を選択します。

  5. [作成] をクリックします。

gcloud

QUEUE=queue-workflow-child
LOCATION=us-central1
gcloud tasks queues create $QUEUE --location=$LOCATION

子ワークフローを作成してデプロイする

子ワークフローは、親ワークフローからデータを受け取って処理できます。次の処理を行う子ワークフローを作成してデプロイします。

  • 引数として iteration を受け取る
  • 一部の処理をシミュレートするために10 秒間スリープする
  • 実行が成功すると文字列を返す

コンソール

  1. Google Cloud コンソールで [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいワークフローに「workflow-child」という名前を入力します。

  4. [リージョン] リストで [us-central1 (Iowa)] を選択します。

  5. [サービス アカウント] リストで、[Compute Engine のデフォルトのサービス アカウント] を選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch workflow-child.yaml
  2. テキスト エディタでソースコード ファイルを開き、次のワークフローをファイルにコピーします。

    main:
      params: [args]
      steps:
        - init:
            assign:
              - iteration : ${args.iteration}
        - wait:
            call: sys.sleep
            args:
                seconds: 10
        - return_message:
            return: ${"Hello world"+iteration}
  3. ワークフローをデプロイします。

    gcloud workflows deploy workflow-child \
        --source=workflow-child.yaml \
        --location=us-central1 \
        --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

親ワークフローを作成してデプロイする

親ワークフローは、for ループを使用して子ワークフローの複数のブランチを実行します。

  1. 親ワークフローを定義するソースコードをコピーします。

    main:
      steps:
        - init:
            assign:
              - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - project_number: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
              - location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
              - workflow_child_name: "workflow-child"
              - queue_name: "queue-workflow-child"
        - enqueue_tasks_to_execute_child_workflow:
            for:
              value: iteration
              range: [1, 100]
              steps:
                  - iterate:
                      assign:
                        - data:
                            iteration: ${iteration}
                        - exec:
                            # Encode object to JSON string in expression for workflow argument
                            argument: ${json.encode_to_string(data)}
                  - create_task_to_execute_child_workflow:
                      call: googleapis.cloudtasks.v2.projects.locations.queues.tasks.create
                      args:
                          parent: ${"projects/" + project_id + "/locations/" + location + "/queues/" + queue_name}
                          body:
                            task:
                              httpRequest:
                                body: ${base64.encode(json.encode(exec))}
                                url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_child_name + "/executions"}
                                oauthToken:
                                  serviceAccountEmail: ${project_number + "-compute@developer.gserviceaccount.com"}

    このワークフローは次のパートで構成されています。

    • 子ワークフローと Cloud Tasks キューの名前を参照する定数を割り当てるために使用されるマップ。詳細については、マップをご覧ください。

    • 子ワークフローを繰り返し呼び出すために実行される for ループ。詳細については、イテレーションをご覧ください。

    • 子ワークフローを実行するために、多数のタスクを作成して Cloud Tasks キューに追加するワークフロー ステップ。詳細については、Cloud Tasks API コネクタをご覧ください。

  2. ワークフローをデプロイします。

    コンソール

    1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

      [ワークフロー] に移動

    2. [ 作成] をクリックします。

    3. 新しいワークフローに「workflow-parent」という名前を入力します。

    4. [リージョン] リストで [us-central1 (Iowa)] を選択します。

    5. [サービス アカウント] リストで、[Compute Engine のデフォルトのサービス アカウント] を選択します。

    6. [次へ] をクリックします。

    7. ワークフロー エディタで、親ワークフローの定義を貼り付けます。

    8. [デプロイ] をクリックします。

    gcloud

    1. ワークフローのソースコード ファイルを作成します。

      touch workflow-parent.yaml
    2. テキスト エディタでソースコード ファイルを開き、親ワークフローの定義を貼り付けます。

    3. ワークフローをデプロイします。

      gcloud workflows deploy workflow-parent \
          --source=workflow-parent.yaml \
          --location=us-central1 \
          --service-account=PROJECT_NUMBER-compute@developer.gserviceaccount.com

レート制限なしで親ワークフローを実行する

親ワークフローを実行し、Cloud Tasks キューを介して子ワークフローを呼び出します。実行が完了するまでに 10 秒ほどかかります。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[workflow-parent] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

  5. 親ワークフローが実行中の場合は、[ワークフロー] ページに戻り、[workflow-child] ワークフローをクリックして詳細ページに移動します。

  6. [Executions] タブをクリックします。

    以下のように、子ワークフローの実行がほぼ同じ時間に実行されていることがわかります。

    ほぼ同時に実行されている子ワークフローの実行の詳細。

gcloud

  1. ワークフローを実行します。

    gcloud workflows run workflow-parent \
         --location=us-central1
  2. ワークフローの実行がトリガーされたことを確認するには、直近の 4 つの実行を一覧表示します。

    gcloud workflows executions list workflow-child --limit=4

    実行数(100)が Workflows の同時実行数の制限を下回っているため、結果は次のようになります。数千件の実行を同時に送信すると、割り当ての問題が発生する可能性があります。

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/1570d06e-d133-4536-a859-b7b6a1a85524
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:15.093934448Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/82724960-7d92-4961-aa2c-a0f0be46212c
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.903007626Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/598126fb-37f9-45bc-91d8-aea7d795d702
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.698260524Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/d2e9960b-f93f-4df4-a594-3e7e5c2be53f
    STATE: ACTIVE
    START_TIME: 2023-07-27T00:56:14.503818840Z
    END_TIME: 

子ワークフローのイテレーションを 100 回呼び出すワークフローを作成してデプロイしました。

レート制限を使用して親ワークフローを実行する

Cloud Tasks キューに 1 秒あたり 1 回のディスパッチのレート制限を適用し、親ワークフローを実行します。

コンソール

  1. Google Cloud コンソールで、[Cloud Tasks] ページに移動します。

    Cloud Tasks に移動

  2. [queue-workflow-child] をクリックし、作成した Cloud Tasks キューをクリックして、[キューを編集] をクリックします。

  3. [タスク ディスパッチのレート上限] セクションの [最大ディスパッチ数] フィールドに「1」と入力します。

  4. [保存] をクリックします。

  5. [Workflows] ページに移動

    [ワークフロー] に移動

  6. [workflow-parent] ワークフローをクリックして、詳細ページに移動します。

  7. [ワークフローの詳細] ページで [ 実行] を選択します。

  8. もう一度 [Execute] をクリックします。

  9. 親ワークフローが実行中の場合は、[ワークフロー] ページに戻り、[workflow-child] ワークフローをクリックして詳細ページに移動します。

  10. [Executions] タブをクリックします。

    以下のように、子ワークフローの実行が 1 秒あたり 1 件のリクエストで実行されていることがわかります。

    1 秒あたりのリクエストで実行される子ワークフローの詳細。

gcloud

  1. Cloud Tasks キューを更新して、1 秒あたり 1 回のディスパッチのレート制限を適用します。

    gcloud tasks queues update $QUEUE \
        --max-dispatches-per-second=1 \
        --location=us-central1
  2. ワークフローを実行します。

    gcloud workflows run workflow-parent \
       --location=us-central1
  3. ワークフローの実行がトリガーされたことを確認するには、直近の 4 つの実行を一覧表示します。

    gcloud workflows executions list workflow-child --limit=4

    結果は次のようになり、1 秒間に 1 つのワークフローが実行されます。

    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/becf4957-9fb2-40d9-835d-0ff2dd0c1249
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:24.446361457Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/6c1e7c4b-7ac6-4121-b351-1e2d56d10903
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:23.448213989Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/f2ba5027-af40-4cd3-8cd0-b8033bcc6211
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:22.431485914Z
    END_TIME:
    NAME: projects/620278351741/locations/us-central1/workflows/workflow-child/executions/ecc61ee5-fe87-49eb-8803-89dba929f6c8
    STATE: ACTIVE
    START_TIME: 2023-07-27T01:07:21.443466369Z
    END_TIME: 

1 秒あたり 1 回のディスパッチ レートで子ワークフローのイテレーションを 100 回呼び出すワークフローが正常にデプロイされました。