Workflows を使用して Cloud Run ジョブを実行する

Workflows を使用すると、ワークフローの一部として Cloud Run ジョブを実行し、より複雑なデータ処理を行う、または既存のジョブのシステムをオーケストレートします。

このチュートリアルでは、Workflows を使用して、Cloud Storage のイベントに応答して環境変数としてジョブに渡されるデータを処理する Cloud Run ジョブを実行する方法について説明します。

イベントデータを Cloud Storage バケットに保存することもできます。これにより、顧客管理の暗号鍵を使用してデータを暗号化できます。詳細については、Cloud Storage に保存されているイベントデータを処理する Cloud Run ジョブを実行するをご覧ください。

Cloud Run ジョブの作成

このチュートリアルでは、GitHub のサンプル Cloud Run ジョブを使用します。このジョブは、Cloud Storage の入力ファイルからデータを読み取り、ファイルの各行に対して任意の処理を実行します。

  1. ローカルマシンにサンプルアプリのリポジトリのクローンを作成して、サンプルコードを取得します。

    git clone https://github.com/GoogleCloudPlatform/jobs-demos.git

    または、サンプルを ZIP ファイルとしてダウンロードし、ファイルを抽出することもできます。

  2. サンプルコードが入っているディレクトリに移動します。

    cd jobs-demos/parallel-processing
  3. 書き込みが可能でイベントをトリガーできる入力ファイルを保存するための Cloud Storage バケットを作成します。

    コンソール

    1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

      [バケット] に移動

    2. add [作成] をクリックします。
    3. [バケットの作成] ページで、バケットの名前を入力します。
      input-PROJECT_ID
      PROJECT_ID は、 Google Cloud プロジェクトの ID に置き換えます。
    4. 他の値はデフォルトのままにします。
    5. [作成] をクリックします。

    gcloud

    gcloud storage buckets create コマンドを実行します。

    gcloud storage buckets create gs://input-PROJECT_ID

    リクエストが成功すると、コマンドから次のメッセージが返されます。

    Creating gs://input-PROJECT_ID/...

    Terraform

    Cloud Storage バケットを作成するには、google_storage_bucket リソースを使用して、次のサンプルに示すように main.tf ファイルを変更します。

    Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

    一般的な Terraform ワークフローでは、プラン全体を一度に適用します。ですが、このチュートリアルでは、特定のリソースをターゲットにできます。次に例を示します。

    terraform apply -target="random_id.bucket_name_suffix"
    および
    terraform apply -target="google_storage_bucket.default"

    # Cloud Storage bucket names must be globally unique
    resource "random_id" "bucket_name_suffix" {
      byte_length = 4
    }
    
    # Create a Cloud Storage bucket
    resource "google_storage_bucket" "default" {
      name                        = "input-${data.google_project.project.name}-${random_id.bucket_name_suffix.hex}"
      location                    = "us-central1"
      storage_class               = "STANDARD"
      force_destroy               = false
      uniform_bucket_level_access = true
    }
  4. コンテナ イメージを保存できる Artifact Registry 標準リポジトリを作成します。

    コンソール

    1. Google Cloud コンソールで、Artifact Registry の [リポジトリ] ページに移動します。

      [リポジトリ] に移動

    2. [リポジトリを作成] をクリックします。

    3. リポジトリの名前を入力します(例: my-repo)。プロジェクト内のリポジトリのロケーションごとに、リポジトリ名は一意でなければなりません。

    4. デフォルトの形式(Docker)のままにします。

    5. デフォルト モード(Standard)のままにします。

    6. リージョンに、[us-central1(アイオワ)] を選択します。

    7. その他すべてのデフォルトはそのままにします。

    8. [作成] をクリックします。

    gcloud

    次のコマンドを実行します。

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=us-central1

    REPOSITORY は、リポジトリの一意の名前(my-repo など)に置き換えます。プロジェクト内のリポジトリのロケーションごとに、リポジトリ名は一意でなければなりません。

    Terraform

    Artifact Registry リポジトリを作成するには、google_artifact_registry_repository リソースを使用して、次のサンプルに示すように main.tf ファイルを変更します。

    一般的な Terraform ワークフローでは、プラン全体を一度に適用します。ですが、このチュートリアルでは、特定のリソースをターゲットにできます。次に例を示します。

    terraform apply -target="google_artifact_registry_repository.default"

    # Create an Artifact Registry repository
    resource "google_artifact_registry_repository" "default" {
      location      = "us-central1"
      repository_id = "my-repo"
      format        = "docker"
    }
  5. デフォルトの Google Cloud Buildpack を使用してコンテナ イメージをビルドします。

    export SERVICE_NAME=parallel-job
    gcloud builds submit \
        --pack image=us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/${SERVICE_NAME}

    REPOSITORY を Artifact Registry リポジトリの名前に置き換えます。

    ビルドが完了するまで数分かかることがあります。

  6. コンテナ イメージをデプロイする Cloud Run ジョブを作成します。

    コンソール

    1. Google Cloud コンソールで、[Cloud Run] ページに移動します。

      Cloud Run に移動

    2. [ジョブを作成] をクリックして、[ジョブを作成] フォームを表示します。

      1. フォームで、Artifact Registry コンテナ イメージの URL として us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job:latest を選択します。
      2. (省略可)ジョブ名に「parallel-job」と入力します。
      3. (省略可)リージョンに、[us-central1(アイオワ)] を選択します。
      4. ジョブで実行するタスクの数として、「10」と入力します。ジョブが成功するには、すべてのタスクが成功する必要があります。デフォルトでは、タスクは並行して実行されます。
    3. [コンテナ、変数とシークレット、接続、セキュリティ] セクションを開き、次の設定を除き、すべてのデフォルトのままにします。

      1. [全般] タブをクリックします。

        1. コンテナ コマンドに「python」と入力します。
        2. コンテナ引数に「process.py」と入力します。
      2. [変数とシークレット] タブをクリックします。

        1. [変数を追加] をクリックし、名前に「INPUT_BUCKET」、値に「input-PROJECT_ID」と入力します。
        2. [変数を追加] をクリックし、名前に「INPUT_FILE」、値に「input_file.txt」と入力します。
    4. ジョブを作成するには、[作成] をクリックします。

    gcloud

    1. デフォルトの Cloud Run リージョンを設定します。

      gcloud config set run/region us-central1
    2. Cloud Run ジョブを作成します。

      gcloud run jobs create parallel-job \
          --image us-central1-docker.pkg.dev/PROJECT_ID/REPOSITORY/parallel-job:latest \
          --command python \
          --args process.py \
          --tasks 10 \
          --set-env-vars=INPUT_BUCKET=input-PROJECT_ID,INPUT_FILE=input_file.txt

      イメージタグを指定しない場合、Artifact Registry はデフォルトの latest タグが付いたイメージを探します。

      ジョブの作成時に使用可能なオプションの一覧については、gcloud run jobs create コマンドライン ドキュメントをご覧ください。

      ジョブが作成されると、成功したことを示すメッセージが表示されます。

    Terraform

    Cloud Run ジョブを作成するには、google_cloud_run_v2_job リソースを使用して、次のサンプルに示すように main.tf ファイルを変更します。

    一般的な Terraform ワークフローでは、プラン全体を一度に適用します。ですが、このチュートリアルでは、特定のリソースをターゲットにできます。次に例を示します。

    terraform apply -target="google_cloud_run_v2_job.default"

    # Create a Cloud Run job
    resource "google_cloud_run_v2_job" "default" {
      name     = "parallel-job"
      location = "us-central1"
    
      template {
        task_count = 10
        template {
          containers {
            image   = "us-central1-docker.pkg.dev/${data.google_project.project.name}/${google_artifact_registry_repository.default.repository_id}/parallel-job:latest"
            command = ["python"]
            args    = ["process.py"]
            env {
              name  = "INPUT_BUCKET"
              value = google_storage_bucket.default.name
            }
            env {
              name  = "INPUT_FILE"
              value = "input_file.txt"
            }
          }
        }
      }
    }

Cloud Run ジョブを実行するワークフローをデプロイする

作成した Cloud Run ジョブを実行するワークフローを定義してデプロイします。ワークフロー定義は、ワークフロー構文を使用して説明した一連のステップで構成されています。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: cloud-run-job-workflow)。

  4. リージョンに、[us-central1(アイオワ)] を選択します。

  5. [サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。

    サービス アカウントは、ワークフローの ID として機能します。ワークフローで Cloud Run ジョブを実行できるようにするには、サービス アカウントに Cloud Run 管理者ロールがすでに付与されている必要があります。

  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch cloud-run-job-workflow.yaml
  2. 次のワークフロー定義をソースコード ファイルにコピーします。

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${event.data.bucket}
                    - event_file: ${event.data.name}
                    - target_bucket: ${"input-" + project_id}
                    - job_name: parallel-job
                    - job_location: us-central1
            - check_input_file:
                switch:
                    - condition: ${event_bucket == target_bucket}
                      next: run_job
                    - condition: true
                      next: end
            - run_job:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy cloud-run-job-workflow \
        --location=us-central1 \
        --source=cloud-run-job-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    以下を置き換えます。

    • SERVICE_ACCOUNT_NAME: 先ほど作成したサービス アカウントの名前
    • PROJECT_ID:Google Cloud プロジェクトの ID

    サービス アカウントは、ワークフローの ID として機能します。ワークフローで Cloud Run ジョブを実行できるようにするには、サービス アカウントに roles/run.admin のロールがすでに付与されている必要があります。

Terraform

ワークフローを作成するには、google_workflows_workflow リソースを使用して、次のサンプルに示すように main.tf ファイルを変更します。

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

一般的な Terraform ワークフローでは、プラン全体を一度に適用します。ですが、このチュートリアルでは、特定のリソースをターゲットにできます。次に例を示します。

terraform apply -target="google_workflows_workflow.default"

# Create a workflow
resource "google_workflows_workflow" "default" {
  name        = "cloud-run-job-workflow"
  region      = "us-central1"
  description = "Workflow that routes a Cloud Storage event and executes a Cloud Run job"

  deletion_protection = false # set to "true" in production

  # Note that $$ is needed for Terraform
  source_contents = <<EOF
  main:
      params: [event]
      steps:
          - init:
              assign:
                  - project_id: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                  - event_bucket: $${event.data.bucket}
                  - event_file: $${event.data.name}
                  - target_bucket: "${google_storage_bucket.default.name}"
                  - job_name: parallel-job
                  - job_location: us-central1
          - check_input_file:
              switch:
                  - condition: $${event_bucket == target_bucket}
                    next: run_job
                  - condition: true
                    next: end
          - run_job:
              call: googleapis.run.v1.namespaces.jobs.run
              args:
                  name: $${"namespaces/" + project_id + "/jobs/" + job_name}
                  location: $${job_location}
                  body:
                      overrides:
                          containerOverrides:
                              env:
                                  - name: INPUT_BUCKET
                                    value: $${event_bucket}
                                  - name: INPUT_FILE
                                    value: $${event_file}
              result: job_execution
          - finish:
              return: $${job_execution}
  EOF
}

ワークフローは、次のことを行います。

  1. init ステップ - Cloud Storage イベントを引数として受け取り、必要な変数を設定します。

  2. check_input_file ステップ - イベントで指定された Cloud Storage バケットが、Cloud Run ジョブで使用されているバケットであるかどうかを確認します。

    • 「はい」の場合、ワークフローは run_job ステップに進みます。
    • 「いいえ」の場合、ワークフローは終了し、それ以上の処理は停止します。
  3. run_job ステップ - Cloud Run Admin API コネクタの googleapis.run.v1.namespaces.jobs.run メソッドを使用して、ジョブを実行します。Cloud Storage バケットとデータファイル名が、ワークフローからジョブにオーバーライド変数として渡されます。

  4. finish ステップ - ワークフローの結果としてジョブの実行に関する情報を返します。

ワークフローの Eventarc トリガーを作成する

入力データファイルが更新されるたびに自動的にワークフローを実行し、次に Cloud Run ジョブを実行するには、入力データファイルを含むバケットの Cloud Storage イベントに応答する Eventarc トリガーを作成します。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. ワークフローの名前(cloud-run-job-workflow など)をクリックします。

  3. [ワークフローの詳細] ページで [ 編集] をクリックします。

  4. [ワークフローの編集] ページの [トリガー] セクションで、[新しいトリガーを追加] > [Eventarc] の順にクリックします。

    [Eventarc トリガー] ペインが開きます。

  5. [トリガー名] フィールドに、トリガーの名前を入力します(例: cloud-run-job-workflow-trigger)。

  6. [イベント プロバイダ] リストで、[Cloud Storage] を選択します。

  7. [イベント] リストから [google.cloud.storage.object.v1.finalized] を選択します。

  8. [バケット] フィールドで、入力データファイルを含むバケットを選択します。バケット名の形式は input-PROJECT_ID です。

  9. [サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。

    サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

    • Eventarc イベント受信者: イベントを受信します
    • Workflows 起動元: Workflows を実行します
  10. [トリガーを保存] をクリックします。

    Eventarc トリガーは、[ワークフローの編集] ページの [トリガー] セクションに表示されます。

  11. [次へ] をクリックします。

  12. [デプロイ] をクリックします。

gcloud

次のコマンドを実行して、Eventarc トリガーを作成します。

gcloud eventarc triggers create cloud-run-job-workflow-trigger \
    --location=us \
    --destination-workflow=cloud-run-job-workflow  \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.storage.object.v1.finalized" \
    --event-filters="bucket=input-PROJECT_ID" \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクトの ID
  • SERVICE_ACCOUNT_NAME: 先ほど作成したサービス アカウントの名前

サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

  • roles/eventarc.eventReceiver: イベントを受信します
  • roles/workflows.invoker: Workflows を実行します

Terraform

トリガーを作成するには、google_eventarc_trigger リソースを使用して、次のサンプルに示すように main.tf ファイルを変更します。

Terraform 構成を適用または削除する方法については、基本的な Terraform コマンドをご覧ください。

一般的な Terraform ワークフローでは、プラン全体を一度に適用します。ですが、このチュートリアルでは、特定のリソースをターゲットにできます。次に例を示します。

terraform apply -target="google_eventarc_trigger.default"

# Create an Eventarc trigger that routes Cloud Storage events to Workflows
resource "google_eventarc_trigger" "default" {
  name     = "cloud-run-job-trigger"
  location = google_workflows_workflow.default.region

  # Capture objects changed in the bucket
  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }
  matching_criteria {
    attribute = "bucket"
    value     = google_storage_bucket.default.name
  }

  # Send events to Workflows
  destination {
    workflow = google_workflows_workflow.default.id
  }

  service_account = google_service_account.workflows.email

}

入力データファイルを含む Cloud Storage バケットでファイルがアップロードまたは上書きされるたびに、ワークフローは対応する Cloud Storage イベントを引数として実行されます。

ワークフローをトリガーする

Cloud Storage の入力データファイルを更新して、エンドツーエンド システムをテストします。

  1. 入力ファイルの新しいデータを生成し、Cloud Run ジョブによって想定される Cloud Storage 内の場所にアップロードします。

    base64 /dev/urandom | head -c 100000 >input_file.txt
    gcloud storage cp input_file.txt gs://input-PROJECT_ID/input_file.txt

    Terraform を使用して Cloud Storage バケットを作成した場合は、次のコマンドを実行してバケットの名前を取得できます。

    gcloud storage buckets list gs://input*

    Cloud Run ジョブの実行には数分かかる場合があります。

  2. ジョブ実行を表示して、Cloud Run ジョブが想定どおりに実行されたことを確認します。

    gcloud config set run/region us-central1
    gcloud run jobs executions list --job=parallel-job

    ジョブの実行が成功したことが出力に示され、10/10 タスクが完了したことが示されます。

詳細については、イベントまたは Pub/Sub メッセージでワークフローをトリガーするをご覧ください。