Workflows を使用すると、より複雑なデータ処理を行うワークフローの一部として、または既存のジョブのシステムをオーケストレートするワークフローの一部として、Cloud Run ジョブを実行できます。
このチュートリアルでは、Workflows を使用して Cloud Storage バケットに保存されているイベントデータを処理する Cloud Run ジョブを実行する方法について説明します。イベント ペイロードを Cloud Storage バケットに保存すると、顧客管理の暗号鍵を使用してデータの暗号化が可能です。これは、Cloud Run ジョブへ環境変数としてイベントデータを渡す場合には不可能です。
次の図は、その概要を示しています。
Cloud Run ジョブの作成
このチュートリアルでは、GitHub で入手できるサンプルコードを使用します。デプロイ スクリプトは、コンテナ イメージをビルドして Cloud Run ジョブを作成します。このスクリプトは、Cloud Storage バケットも作成します。Cloud Run ジョブは、Cloud Storage バケットに保存されているイベントデータを読み取り、そのイベントデータを出力します。
Cloud Shell でデプロイ スクリプトを実行している場合や、Compute Engine のデフォルトのサービス アカウントに編集者ロールが設定されていない場合は、Compute Engine のデフォルト サービス アカウントにプロジェクトに対する次のロールを付与します。(この手順をスキップして、次のステップでサンプルアプリ リポジトリのクローンを作成することもできます。)
Artifact Registry 書き込みロール(
roles/artifactregistry.writer
)を付与します。gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/artifactregistry.writer
PROJECT_NUMBER
は、使用する Google Cloudプロジェクト番号に置き換えます。プロジェクト番号は、 Google Cloud コンソールの [ようこそ] ページで確認できます。また、次のコマンドでも確認できます。gcloud projects describe PROJECT_ID --format='value(projectNumber)'
Storage オブジェクト ユーザー ロール(
roles/storage.objectUser
)を付与します。gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/storage.objectUser
Logging ログ書き込みロール(
roles/logging.logWriter
)を付与します。gcloud projects add-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/logging.logWriter
ローカルマシンにサンプルアプリのリポジトリのクローンを作成して、サンプルコードを取得します。
git clone https://github.com/GoogleCloudPlatform/workflows-demos.git
または、zip 形式のサンプルをダウンロードすることもできます。
サンプルコードが入っているディレクトリに移動します。
cd workflows-demos/cloud-run-jobs-payload-gcs/message-payload-job
デプロイ スクリプトを実行して、Cloud Run ジョブを作成します。
./deploy-job.sh
スクリプトは、message-payload-PROJECT_ID
という名前の Cloud Storage バケットを作成します。ここで、PROJECT_ID
は Google Cloud プロジェクトの ID です。message-payload-job
という名前の Cloud Run ジョブも作成されます。
Cloud Run ジョブを実行するワークフローをデプロイする
作成した Cloud Run ジョブを実行するワークフローを定義してデプロイします。ワークフロー定義は、Workflows 構文を使用して説明した一連のステップで構成されています。
ワークフローはイベントを受信し、イベントデータを Cloud Storage バケットに保存してから、Cloud Run ジョブを実行してイベントデータを処理します。
コンソール
Google Cloud コンソールで、[ワークフロー] ページに移動します。
[
作成] をクリックします。新しいワークフローの名前を入力します(例:
message-payload-workflow
)。適切なリージョン(例:
us-central1
)を選択します。[サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。
サービス アカウントは、ワークフローの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。
- Cloud Run 管理者: Cloud Run ジョブを実行します
- ログ書き込み: ログエントリを書き込みます
- Storage オブジェクト作成者: Cloud Storage にオブジェクトを作成します
[次へ] をクリックします。
ワークフロー エディタで、次のワークフローの定義を入力します。
[デプロイ] をクリックします。
gcloud
ワークフローのソースコード ファイルを作成します。
touch message-payload-workflow.yaml
次のワークフロー定義を
message-payload-workflow.yaml
にコピーします。次のコマンドを入力してワークフローをデプロイします。
gcloud workflows deploy message-payload-workflow \ --location=us-central1 \ --source=message-payload-workflow.yaml \ --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
以下を置き換えます。
SERVICE_ACCOUNT_NAME
: 先ほど作成したサービス アカウントの名前PROJECT_ID
:Google Cloud プロジェクトの ID
サービス アカウントは、ワークフローの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。
roles/logging.logWriter
: ログエントリを書き込みますroles/run.admin
: Cloud Run ジョブを実行しますroles/storage.objectCreator
: Cloud Storage にオブジェクトを作成します
ワークフローは、次のことを行います。
init
ステップ - イベントを引数として受け取り、必要な変数を設定します。log_event
ステップ - 関数 sys.log を使用して、Cloud Logging にログエントリを作成します。write_payload_to_gcs
ステップ - HTTPPOST
リクエストを行い、イベント ペイロード データを Cloud Storage バケット ファイルに書き込みます。run_job_to_process_payload
ステップ - Cloud Run Admin API コネクタ メソッドgoogleapis.run.v1.namespaces.jobs.run
を使用して、ジョブを実行します。Cloud Storage バケットとデータファイル名が、ワークフローからジョブにオーバーライド変数として渡されます。finish
ステップ - ワークフローの結果としてジョブの実行に関する情報を返します。
Pub/Sub トピックの作成
メッセージをパブリッシュできるように、Pub/Sub トピックを作成します。Pub/Sub イベントは、Workflows を使用してイベントをルーティングし、Cloud Storage にイベントを保存して、Cloud Run ジョブがイベントデータを処理できるようにする方法を示すために使用されます。
コンソール
Google Cloud コンソールで、[トピック] ページに移動します。
[
トピックを作成] をクリックします。[トピック ID] フィールドに、トピックの ID(
message-payload-topic
など)を入力します。[デフォルトのサブスクリプションを追加] オプションは、そのまま保持します。
他のオプションは選択しないでください。
[作成] をクリックします。
gcloud
ID message-payload-topic
を含むトピックを作成するには、gcloud pubsub topics create
コマンドを実行します。
gcloud pubsub topics create message-payload-topic
イベントをワークフローにルーティングする Eventarc トリガーを作成する
ワークフローと Cloud Run ジョブを自動的に実行するには、Pub/Sub イベントに応答し、イベントをワークフローに転送する Eventarc トリガーを作成します。メッセージが Pub/Sub トピックに書き込まれるたびに、イベントによってワークフローの実行がトリガーされます。
コンソール
Google Cloud コンソールで、[ワークフロー] ページに移動します。
ワークフローの名前(
message-payload-workflow
など)をクリックします。[ワークフローの詳細] ページで [
編集] をクリックします。[ワークフローの編集] ページの [トリガー] セクションで、[新しいトリガーを追加] > [Eventarc] の順にクリックします。
[Eventarc トリガー] ペインが開きます。
[トリガー名] フィールドに、トリガーの名前を入力します(例:
message-payload-trigger
)。[イベント プロバイダ] リストから、[Cloud Pub/Sub] を選択します。
[イベント] リストから、[google.cloud.pubsub.topic.v1.messagePublished] を選択します。
[Cloud Pub/Sub トピックを選択してください] リストから、前に作成した Pub/Sub トピックを選択します。
[サービス アカウント] フィールドで、先ほど作成したサービス アカウントを選択します。
サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。
- Eventarc イベント受信者: イベントを受信します
- Workflows 起動元: Workflows を実行します
[トリガーを保存] をクリックします。
Eventarc トリガーは、[ワークフローの編集] ページの [トリガー] セクションに表示されます。
[次へ] をクリックします。
[デプロイ] をクリックします。
gcloud
次のコマンドを実行して、Eventarc トリガーを作成します。
gcloud eventarc triggers create message-payload-trigger \ --location=us-central1 \ --destination-workflow=message-payload-workflow \ --destination-workflow-location=us-central1 \ --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \ --transport-topic=projects/PROJECT_ID/topics/message-payload-topic \ --service-account=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
次のように置き換えます。
PROJECT_ID
: Google Cloud プロジェクトの IDSERVICE_ACCOUNT_NAME
: 先ほど作成したサービス アカウントの名前
サービス アカウントは、トリガーの ID として機能します。サービス アカウントには、次のロールがすでに付与されているはずです。
roles/eventarc.eventReceiver
: イベントを受信しますroles/workflows.invoker
: Workflows を実行します
ワークフローをトリガーする
Pub/Sub トピックにメッセージをパブリッシュしてイベントを生成し、エンドツーエンド システムをテストします。詳細については、イベントまたは Pub/Sub メッセージでワークフローをトリガーするをご覧ください。
Pub/Sub トピックにメッセージをパブリッシュしてイベントを生成します。
gcloud pubsub topics publish message-payload-topic --message="Hello World"
イベントはワークフローに転送され、イベント メッセージがログに記録され、イベントデータが Cloud Storage バケットに保存されて、Cloud Storage に保存されたデータを処理する Cloud Run ジョブが実行されます。この処理には 1 分ほどかかることがあります。
ジョブ実行を表示して、Cloud Run ジョブが想定どおりに実行されたことを確認します。
gcloud run jobs executions list --job=message-payload-job
新しいジョブの実行が出力に表示されます。
ワークフローのトリガーによって作成されたイベント関連ログエントリを表示するには、次のコマンドを実行します。
gcloud logging read "resource.type=cloud_run_job AND textPayload:Payload"
次のようなログエントリを探します。
textPayload: "Payload: {'message': {'data': 'SGVsbG8gV29ybGQ=', 'messageId': '8834108235224238',\ \ 'publishTime': '2023-09-20T17:07:52.921Z'}, 'subscription': 'projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741'}" ... resource: labels: job_name: message-payload-job location: us-central1 project_id: MY_PROJECT type: cloud_run_job textPayload: Processing message payload gs://message-payload-MY_PROJECT/8254002311197919.data.json
Cloud Storage バケット オブジェクトのイベントデータを表示することで、結果が想定どおりであることを確認できます。
バケット名を取得します。
gcloud storage ls
出力は次のようになります。
gs://message-payload-PROJECT_ID/
バケット内のオブジェクトを一覧表示します。
gcloud storage ls gs://message-payload-PROJECT_ID/** --recursive
出力例を以下に示します。
gs://message-payload-PROJECT_ID/OBJECT_ID.data.json
OBJECT_ID
をメモします。これは次の手順で使用します。バケット内のオブジェクトをファイルとしてダウンロードします。
gcloud storage cp gs://message-payload-PROJECT_ID/OBJECT_ID.data.json message-event.txt
OBJECT_ID
は、前の手順で返された ID に置き換えます。テキスト エディタを使用して、
message-event.txt
ファイルを開きます。ファイルに書き込まれたイベント本文は次のようになります。{ "message": { "data": "SGVsbG8gV29ybGQ=", "messageId": "8254002311197919", "publishTime": "2023-09-20T16:54:29.312Z" }, "subscription": "projects/MY_PROJECT/subscriptions/eventarc-us-central1-message-payload-trigger-sub-741" }
SGVsbG8gV29ybGQ=
値を Base64 形式からデコードすると「Hello World」が返されます。