Workflows を使用して Batch ジョブを実行する

Batch は、Compute Engine 仮想マシン(VM)インスタンスでバッチ処理ワークロードのスケジューリング、キューイング、実行を行えるフルマネージド サービスです。Batch がお客様に代わってリソースをプロビジョニングし、容量を管理することで、バッチ ワークロードを大規模に実行できます。

Workflows では、Workflows の構文を使用して定義した順序で必要なサービスを実行できます。

このチュートリアルでは、Batch 用 Workflows コネクタを使用して、2 つの Compute Engine VM で 6 つのタスクを同時に実行する Batch ジョブをスケジュール設定して実行します。Batch と Workflows の両方を使用すると、それぞれの利点を享受し、プロセス全体を効率的にプロビジョニングしてオーケストレートできます。

Artifact Registry リポジトリを作成する

Docker コンテナ イメージを保存するリポジトリを作成します。

コンソール

  1. Google Cloud コンソールで、[リポジトリ] ページに移動します。

    [リポジトリ] に移動

  2. [リポジトリを作成] をクリックします。

  3. リポジトリ名として「containers」と入力します。

  4. [形式] は [Docker] を選択します。

  5. [ロケーション タイプ] で、[リージョン] を選択します。

  6. [リージョン] リストで [us-central1] を選択します。

  7. [作成] をクリックします。

gcloud

次のコマンドを実行します。

  gcloud artifacts repositories create containers \
    --repository-format=docker \
    --location=us-central1

us-central1 リージョンに containers という名前の Artifact Registry リポジトリを作成しました。サポートされているリージョンの詳細については、Artifact Registry のロケーションをご覧ください。

コードサンプルを取得する

Google Cloud は、このチュートリアルのアプリケーション ソースコードを GitHub に保存します。対象のリポジトリのクローンを作成するか、サンプルをダウンロードできます。

  1. ローカルマシンにサンプルアプリのリポジトリのクローンを作成します。

    git clone https://github.com/GoogleCloudPlatform/batch-samples.git
    

    または、main.zip ファイルのサンプルをダウンロードして抽出することもできます。

  2. サンプルコードが入っているディレクトリに移動します。

    cd batch-samples/primegen
    

これで、開発環境にアプリケーションのソースコードを用意できました。

Cloud Build を使用して Docker イメージをビルドする

Dockerfile には、Cloud Build を使用して Docker イメージをビルドするために必要な情報が含まれています。次のコマンドを実行してビルドします。

gcloud builds submit \
  -t us-central1-docker.pkg.dev/PROJECT_ID/containers/primegen-service:v1 PrimeGenService/

PROJECT_ID は、実際の Google Cloudプロジェクト ID に置き換えます。

ビルドが完了すると、次のような出力が表示されます。

DONE
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ID: a54818cc-5d14-467b-bfda-5fc9590af68c
CREATE_TIME: 2022-07-29T01:48:50+00:00
DURATION: 48S
SOURCE: gs://project-name_cloudbuild/source/1659059329.705219-17aee3a424a94679937a7200fab15bcf.tgz
IMAGES: us-central1-docker.pkg.dev/project-name/containers/primegen-service:v1
STATUS: SUCCESS

Dockerfile を使用して、primegen-service という名前の Docker イメージをビルドし、そのイメージを containers という名前の Artifact Registry リポジトリに push しました。

Batch ジョブをスケジュール設定して実行するワークフローをデプロイする

次のワークフローは、2 つの Compute Engine VM で 6 つのタスクとして Docker コンテナを並列に実行する Batch ジョブをスケジュールして実行します。結果として、6 個の素数のバッチが生成され Cloud Storage バケットに格納されます。

コンソール

  1. Google Cloud コンソールで [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: batch-workflow)。

  4. [リージョン] リストで [us-central1] を選択します。

  5. 先ほど作成したサービス アカウントを選択します。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch batch-workflow.JSON_OR_YAML

    ワークフローの形式に応じて、JSON_OR_YAMLyaml または json に置き換えます。

  2. テキスト エディタで、次のワークフローをソースコード ファイルにコピーします。

    YAML

    main:
      params: [args]
      steps:
        - init:
            assign:
              - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              - region: "us-central1"
              - imageUri: ${region + "-docker.pkg.dev/" + projectId + "/containers/primegen-service:v1"}
              - jobId: ${"job-primegen-" + string(int(sys.now()))}
              - bucket: ${projectId + "-" + jobId}
        - createBucket:
            call: googleapis.storage.v1.buckets.insert
            args:
              query:
                project: ${projectId}
              body:
                name: ${bucket}
        - logCreateBucket:
            call: sys.log
            args:
              data: ${"Created bucket " + bucket}
        - logCreateBatchJob:
            call: sys.log
            args:
              data: ${"Creating and running the batch job " + jobId}
        - createAndRunBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.create
            args:
                parent: ${"projects/" + projectId + "/locations/" + region}
                jobId: ${jobId}
                body:
                  taskGroups:
                    taskSpec:
                      runnables:
                        - container:
                            imageUri: ${imageUri}
                          environment:
                            variables:
                              BUCKET: ${bucket}
                    # Run 6 tasks on 2 VMs
                    taskCount: 6
                    parallelism: 2
                  logsPolicy:
                    destination: CLOUD_LOGGING
            result: createAndRunBatchJobResponse
        # You can delete the batch job or keep it for debugging
        - logDeleteBatchJob:
            call: sys.log
            args:
              data: ${"Deleting the batch job " + jobId}
        - deleteBatchJob:
            call: googleapis.batch.v1.projects.locations.jobs.delete
            args:
                name: ${"projects/" + projectId + "/locations/" + region + "/jobs/" + jobId}
            result: deleteResult
        - returnResult:
            return:
              jobId: ${jobId}
              bucket: ${bucket}

    JSON

    {
      "main": {
        "params": [
          "args"
        ],
        "steps": [
          {
            "init": {
              "assign": [
                {
                  "projectId": "${sys.get_env(\"GOOGLE_CLOUD_PROJECT_ID\")}"
                },
                {
                  "region": "us-central1"
                },
                {
                  "imageUri": "${region + \"-docker.pkg.dev/\" + projectId + \"/containers/primegen-service:v1\"}"
                },
                {
                  "jobId": "${\"job-primegen-\" + string(int(sys.now()))}"
                },
                {
                  "bucket": "${projectId + \"-\" + jobId}"
                }
              ]
            }
          },
          {
            "createBucket": {
              "call": "googleapis.storage.v1.buckets.insert",
              "args": {
                "query": {
                  "project": "${projectId}"
                },
                "body": {
                  "name": "${bucket}"
                }
              }
            }
          },
          {
            "logCreateBucket": {
              "call": "sys.log",
              "args": {
                "data": "${\"Created bucket \" + bucket}"
              }
            }
          },
          {
            "logCreateBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Creating and running the batch job \" + jobId}"
              }
            }
          },
          {
            "createAndRunBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.create",
              "args": {
                "parent": "${\"projects/\" + projectId + \"/locations/\" + region}",
                "jobId": "${jobId}",
                "body": {
                  "taskGroups": {
                    "taskSpec": {
                      "runnables": [
                        {
                          "container": {
                            "imageUri": "${imageUri}"
                          },
                          "environment": {
                            "variables": {
                              "BUCKET": "${bucket}"
                            }
                          }
                        }
                      ]
                    },
                    "taskCount": 6,
                    "parallelism": 2
                  },
                  "logsPolicy": {
                    "destination": "CLOUD_LOGGING"
                  }
                }
              },
              "result": "createAndRunBatchJobResponse"
            }
          },
          {
            "logDeleteBatchJob": {
              "call": "sys.log",
              "args": {
                "data": "${\"Deleting the batch job \" + jobId}"
              }
            }
          },
          {
            "deleteBatchJob": {
              "call": "googleapis.batch.v1.projects.locations.jobs.delete",
              "args": {
                "name": "${\"projects/\" + projectId + \"/locations/\" + region + \"/jobs/\" + jobId}"
              },
              "result": "deleteResult"
            }
          },
          {
            "returnResult": {
              "return": {
                "jobId": "${jobId}",
                "bucket": "${bucket}"
              }
            }
          }
        ]
      }
    }
    
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy batch-workflow \
      --source=batch-workflow.yaml \
      --location=us-central1 \
      --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    SERVICE_ACCOUNT_NAME は、先ほど作成したサービス アカウントの名前に置き換えます。

ワークフローを実行する

ワークフローを実行すると、そのワークフローに関連付けられた現在のワークフロー定義が実行されます。

コンソール

  1. Google Cloud コンソールで [ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ワークフロー] ページで、[batch-workflow] ワークフローをクリックして詳細ページに移動します。

  3. [ワークフローの詳細] ページで [ 実行] を選択します。

  4. もう一度 [Execute] をクリックします。

    ワークフローの実行には数分を要します。

  5. ワークフローの結果が [出力] ペインに表示されます。

    結果は次のようになります。

    {
      "bucket": "project-name-job-primegen-TIMESTAMP",
      "jobId": "job-primegen-TIMESTAMP"
    }
    

gcloud

  1. ワークフローを実行します。

    gcloud workflows run batch-workflow \
      --location=us-central1

    ワークフローの実行には数分を要します。

  2. 長時間実行のステータスを確認できます。

  3. 最後に完了した実行のステータスを取得するには、次のコマンドを実行します。

    gcloud workflows executions describe-last

    結果の例を以下に示します。

    name: projects/PROJECT_NUMBER/locations/us-central1/workflows/batch-workflow/executions/EXECUTION_ID
    result: '{"bucket":"project-name-job-primegen-TIMESTAMP","jobId":"job-primegen-TIMESTAMP"}'
    startTime: '2022-07-29T16:08:39.725306421Z'
    state: SUCCEEDED
    status:
      currentSteps:
      - routine: main
        step: returnResult
    workflowRevisionId: 000001-9ba
    

出力バケット内のオブジェクトを一覧表示する。

Cloud Storage 出力バケット内のオブジェクトを一覧表示することで、結果が想定どおりであることを確認できます。

コンソール

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. バケットリストで、コンテンツを表示するバケットの名前をクリックします。

    結果は次のようになります。合計で 6 つのファイルがあり、それぞれが 10,000 個の素数のバッチを一覧表示しています。

    primes-1-10000.txt
    primes-10001-20000.txt
    primes-20001-30000.txt
    primes-30001-40000.txt
    primes-40001-50000.txt
    primes-50001-60000.txt
    

gcloud

  1. 出力バケット名を取得します。

    gcloud storage ls

    出力は次のようになります。

    gs://PROJECT_ID-job-primegen-TIMESTAMP/

  2. バケット内のオブジェクトを一覧表示する。

    gcloud storage ls gs://PROJECT_ID-job-primegen-TIMESTAMP/** --recursive

    TIMESTAMP は、前のコマンドで返されたタイムスタンプに置き換えます。

    出力は次のようになります。合計で 6 つのファイルがあり、それぞれに 10,000 個の素数のバッチが一覧表示されています。

    gs://project-name-job-primegen-TIMESTAMP/primes-1-10000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-10001-20000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-20001-30000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-30001-40000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-40001-50000.txt
    gs://project-name-job-primegen-TIMESTAMP/primes-50001-60000.txt