Cloud Storage に保存されているイベントデータを処理する Cloud Run ジョブを実行する

Workflows を使用すると、より複雑なデータ処理を行うワークフローの一部として、または既存のジョブのシステムをオーケストレートするワークフローの一部として、Cloud Run ジョブを実行できます。

このチュートリアルでは、Workflows を使用して Cloud Storage バケットに保存されているイベントデータを処理する Cloud Run ジョブを実行する方法について説明します。イベント ペイロードを Cloud Storage バケットに保存すると、顧客管理の暗号鍵を使用してデータの暗号化が可能です。これは、Cloud Run ジョブへ環境変数としてイベントデータを渡す場合には不可能です。

次の図は、その概要を示しています。

Pub/Sub イベントは、Eventarc トリガーによって Workflows にルーティングされ、Cloud Storage バケットに保存されます。Cloud Run ジョブは、バケットに保存されるイベントデータを処理します。

Cloud Run ジョブの作成

このチュートリアルでは、GitHub で入手できるサンプルコードを使用します。デプロイ スクリプトは、コンテナ イメージをビルドして Cloud Run ジョブを作成します。このスクリプトは、Cloud Storage バケットも作成します。Cloud Run ジョブは、Cloud Storage バケットに保存されているイベントデータを読み取り、そのイベントデータを出力します。

  1. Cloud Shell でデプロイ スクリプトを実行している場合や、Compute Engine のデフォルトのサービス アカウントに編集者ロールが設定されていない場合は、Compute Engine のデフォルト サービス アカウントにプロジェクトに対する次のロールを付与します。(この手順をスキップして、次のステップでサンプルアプリ リポジトリのクローンを作成することもできます。)

    1. Artifact Registry 書き込みロールroles/artifactregistry.writer)を付与します。

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/artifactregistry.writer

      PROJECT_NUMBER は、使用する Google Cloudプロジェクト番号に置き換えます。プロジェクト番号は、 Google Cloud コンソールの [ようこそ] ページで確認できます。また、次のコマンドでも確認できます。

      gcloud projects describe PROJECT_ID --format='value(projectNumber)'

    2. Storage オブジェクト ユーザー ロールroles/storage.objectUser)を付与します。

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/storage.objectUser
    3. Logging ログ書き込みロールroles/logging.logWriter)を付与します。

      gcloud projects add-iam-policy-binding PROJECT_ID \
          --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
          --role=roles/logging.logWriter
  2. ローカルマシンにサンプルアプリのリポジトリのクローンを作成して、サンプルコードを取得します。

    git clone https://github.com/GoogleCloudPlatform/workflows-demos.git

    または、zip 形式のサンプルをダウンロードすることもできます。

  3. サンプルコードが入っているディレクトリに移動します。

    cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
  4. デプロイ スクリプトを実行して、Cloud Run ジョブを作成します。

    ./deploy-job.sh

スクリプトは、message-payload-PROJECT_ID という名前の Cloud Storage バケットを作成します。ここで、PROJECT_ID は Google Cloud プロジェクトの ID です。message-payload-job という名前の Cloud Run ジョブも作成されます。

Cloud Run ジョブを実行するワークフローをデプロイする

作成した Cloud Run ジョブを実行するワークフローを定義してデプロイします。ワークフロー定義は、Workflows 構文を使用して説明した一連のステップで構成されています。

ワークフローはイベントを受信し、イベントデータを Cloud Storage バケットに保存してから、Cloud Run ジョブを実行してイベントデータを処理します。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. [ 作成] をクリックします。

  3. 新しいワークフローの名前を入力します(例: message-payload-workflow)。

  4. 適切なリージョン(例: us-central1)を選択します。

  5. [サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。

    サービス アカウントは、ワークフローの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

    • Cloud Run 管理者: Cloud Run ジョブを実行します
    • ログ書き込み: ログエントリを書き込みます
    • Storage オブジェクト作成者: Cloud Storage にオブジェクトを作成します
  6. [次へ] をクリックします。

  7. ワークフロー エディタで、次のワークフローの定義を入力します。

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  8. [デプロイ] をクリックします。

gcloud

  1. ワークフローのソースコード ファイルを作成します。

    touch message-payload-workflow.yaml
  2. 次のワークフロー定義を message-payload-workflow.yaml にコピーします。

    main:
        params: [event]
        steps:
            - init:
                assign:
                    - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
                    - event_bucket: ${"message-payload-" + project_id}
                    - event_file: ${event.id + ".data.json"}
                    - job_name: message-payload-job
                    - job_location: us-central1
            - log_event:
                call: sys.log
                args:
                    data: ${event}
            - write_payload_to_gcs:
                call: http.post
                args:
                    url: ${"https://storage.googleapis.com/upload/storage/v1/b/" + event_bucket + "/o"}
                    auth:
                        type: OAuth2
                    query:
                        name: ${event_file}
                    body:
                        ${event.data}
            - run_job_to_process_payload:
                call: googleapis.run.v1.namespaces.jobs.run
                args:
                    name: ${"namespaces/" + project_id + "/jobs/" + job_name}
                    location: ${job_location}
                    body:
                        overrides:
                            containerOverrides:
                                env:
                                    - name: INPUT_BUCKET
                                      value: ${event_bucket}
                                    - name: INPUT_FILE
                                      value: ${event_file}
                result: job_execution
            - finish:
                return: ${job_execution}
  3. 次のコマンドを入力してワークフローをデプロイします。

    gcloud workflows deploy message-payload-workflow \
        --location=us-central1 \
        --source=message-payload-workflow.yaml \
        --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

    以下を置き換えます。

    • SERVICE_ACCOUNT_NAME: 先ほど作成したサービス アカウントの名前
    • PROJECT_ID:Google Cloud プロジェクトの ID

    サービス アカウントは、ワークフローの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

    • roles/logging.logWriter: ログエントリを書き込みます
    • roles/run.admin: Cloud Run ジョブを実行します
    • roles/storage.objectCreator: Cloud Storage にオブジェクトを作成します

ワークフローは、次のことを行います。

  1. init ステップ - イベントを引数として受け取り、必要な変数を設定します。

  2. log_event ステップ - 関数 sys.log を使用して、Cloud Logging にログエントリを作成します。

  3. write_payload_to_gcs ステップ - HTTP POST リクエストを行い、イベント ペイロード データを Cloud Storage バケット ファイルに書き込みます。

  4. run_job_to_process_payload ステップ - Cloud Run Admin API コネクタ メソッド googleapis.run.v1.namespaces.jobs.run を使用して、ジョブを実行します。Cloud Storage バケットとデータファイル名が、ワークフローからジョブにオーバーライド変数として渡されます。

  5. finish ステップ - ワークフローの結果としてジョブの実行に関する情報を返します。

Pub/Sub トピックの作成

メッセージをパブリッシュできるように、Pub/Sub トピックを作成します。Pub/Sub イベントは、Workflows を使用してイベントをルーティングし、Cloud Storage にイベントを保存して、Cloud Run ジョブがイベントデータを処理できるようにする方法を示すために使用されます。

コンソール

  1. Google Cloud コンソールで、[トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID(message-payload-topic など)を入力します。

  4. [デフォルトのサブスクリプションを追加] オプションは、そのまま保持します。

  5. 他のオプションは選択しないでください。

  6. [作成] をクリックします。

gcloud

ID message-payload-topic を含むトピックを作成するには、gcloud pubsub topics create コマンドを実行します。

gcloud pubsub topics create message-payload-topic

イベントをワークフローにルーティングする Eventarc トリガーを作成する

ワークフローと Cloud Run ジョブを自動的に実行するには、Pub/Sub イベントに応答し、イベントをワークフローに転送する Eventarc トリガーを作成します。メッセージが Pub/Sub トピックに書き込まれるたびに、イベントによってワークフローの実行がトリガーされます。

コンソール

  1. Google Cloud コンソールで、[ワークフロー] ページに移動します。

    [ワークフロー] に移動

  2. ワークフローの名前(message-payload-workflow など)をクリックします。

  3. [ワークフローの詳細] ページで [ 編集] をクリックします。

  4. [ワークフローの編集] ページの [トリガー] セクションで、[新しいトリガーを追加] > [Eventarc] の順にクリックします。

    [Eventarc トリガー] ペインが開きます。

  5. [トリガー名] フィールドに、トリガーの名前を入力します(例: message-payload-trigger)。

  6. [イベント プロバイダ] リストから、[Cloud Pub/Sub] を選択します。

  7. [イベント] リストから、[google.cloud.pubsub.topic.v1.messagePublished] を選択します。

  8. [Cloud Pub/Sub トピックを選択してください] リストから、前に作成した Pub/Sub トピックを選択します。

  9. [サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。

    サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

    • Eventarc イベント受信者: イベントを受信します
    • Workflows 起動元: Workflows を実行します
  10. [トリガーを保存] をクリックします。

    Eventarc トリガーは、[ワークフローの編集] ページの [トリガー] セクションに表示されます。

  11. [次へ] をクリックします。

  12. [デプロイ] をクリックします。

gcloud

次のコマンドを実行して、Eventarc トリガーを作成します。

gcloud eventarc triggers create message-payload-trigger \
    --location=us-central1 \
    --destination-workflow=message-payload-workflow \
    --destination-workflow-location=us-central1 \
    --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
    --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \
    --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com

次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクトの ID
  • SERVICE_ACCOUNT_NAME: 先ほど作成したサービス アカウントの名前

サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。

  • roles/eventarc.eventReceiver: イベントを受信します
  • roles/workflows.invoker: Workflows を実行します

ワークフローをトリガーする

Pub/Sub トピックにメッセージをパブリッシュしてイベントを生成し、エンドツーエンド システムをテストします。詳細については、イベントまたは Pub/Sub メッセージでワークフローをトリガーするをご覧ください。

  1. Pub/Sub トピックにメッセージをパブリッシュしてイベントを生成します。

    gcloud pubsub topics publish message-payload-topic --message="Hello World"
    

    イベントはワークフローに転送され、イベント メッセージがログに記録され、イベントデータが Cloud Storage バケットに保存されて、Cloud Storage に保存されたデータを処理する Cloud Run ジョブが実行されます。この処理には 1 分ほどかかることがあります。

  2. ジョブ実行を表示して、Cloud Run ジョブが想定どおりに実行されたことを確認します。

    gcloud run jobs executions list --job=message-payload-job

    新しいジョブの実行が出力に表示されます。

  3. ワークフローのトリガーによって作成されたイベント関連ログエントリを表示するには、次のコマンドを実行します。

    gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
    
  4. 次のようなログエントリを探します。

    textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\
    \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}"
    ...
    resource:
    labels:
      job_name: message-payload-job
      location: us-central1
      project_id: MY_PROJECT
    type: cloud_run_job
    textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
    
  5. Cloud Storage バケット オブジェクトのイベントデータを表示することで、結果が想定どおりであることを確認できます。

    1. バケット名を取得します。

      gcloud storage ls

      出力は次のようになります。

      gs://message-payload-PROJECT_ID/

    2. バケット内のオブジェクトを一覧表示します。

      gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive

      出力例を以下に示します。

      gs://message-payload-PROJECT_ID/OBJECT_ID.data.json

      OBJECT_ID をメモします。これは次の手順で使用します。

    3. バケット内のオブジェクトをファイルとしてダウンロードします。

      gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt

      OBJECT_ID は、前の手順で返された ID に置き換えます。

    4. テキスト エディタを使用して、message-event.txt ファイルを開きます。ファイルに書き込まれたイベント本文は次のようになります。

      {
        "message": {
          "data": "SGVsbG8gV29ybGQ=",
          "messageId": "8254002311197919",
          "publishTime": "2023-09-20T16:54:29.312Z"
        },
        "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741"
      }
      

      SGVsbG8gV29ybGQ= 値を Base64 形式からデコードすると「Hello World」が返されます。