Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
このページでは、Pub/Sub トピックの変更に応じて Cloud Composer DAG をトリガーすることで、イベントベースのプッシュ アーキテクチャを作成する方法について説明します。このチュートリアルの例は、DAG プロセスの一部として、サブスクリプション管理など、Pub/Sub 管理の全サイクルを処理するための方法を示しています。これは、DAG をトリガーする必要があるものの、追加のアクセス権を設定したくない一般的なユースケースに適しています。
たとえば、セキュリティ上の理由から Cloud Composer 環境への直接アクセスを提供しない場合は、Pub/Sub を介して送信されるメッセージをソリューションとして使用できます。Pub/Sub メッセージを作成し、Pub/Sub トピックで公開する Cloud Run functions の関数を構成できます。次に、Pub/Sub メッセージを pull してこれらのメッセージを処理する DAG を作成できます。
この例では、Cloud Run functions の関数を作成し、2 つの DAG をデプロイします。最初の DAG は Pub/Sub メッセージを pull し、Pub/Sub メッセージ コンテンツに沿って 2 番目の DAG をトリガーします。
このチュートリアルは、Python と Google Cloud コンソールの知識があることを前提としています。
目標
費用
このチュートリアルでは、課金対象である次の Google Cloudコンポーネントを使用します。
このチュートリアルを終了した後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
このチュートリアルでは Google Cloudプロジェクトが必要です。プロジェクトは、次のように構成します:
Google Cloud コンソールで、プロジェクトを選択または作成します。
プロジェクトに対して課金が有効になっていることを確認します。、プロジェクトで課金が有効になっているかどうかを確認する方法をご覧ください。
Google Cloud プロジェクト ユーザーに、必要なリソースを作成するための次のロールがあることを確認します。
- サービス アカウント ユーザー (
roles/iam.serviceAccountUser) - Pub/Sub 編集者(
roles/pubsub.editor) - 環境とストレージ オブジェクトの管理者
(
roles/composer.environmentAndStorageObjectAdmin) - Cloud Run functions 管理者(
roles/cloudfunctions.admin) - ログビューア (
roles/logging.viewer)
- サービス アカウント ユーザー (
Cloud Run functions の関数を実行するサービス アカウントに、プロジェクトで Pub/Sub にアクセスするための十分な権限が付与されていることを確認します。デフォルトでは、Cloud Run functions は App Engine のデフォルトのサービス アカウントを使用します。このサービス アカウントには編集者のロールがあり、このチュートリアルに必要な権限が付与されています。
プロジェクトでAPI を有効にする
コンソール
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin), which
contains the serviceusage.services.enable permission. Learn how to grant
roles.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM
role (roles/serviceusage.serviceUsageAdmin), which contains the
serviceusage.services.enable permission. Learn how to grant
roles.
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Terraform スクリプトに次のリソース定義を追加して、プロジェクトで Cloud Composer API を有効にします。
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project。
Cloud Composer 環境を作成する
この手順の一環として、Cloud Composer v2 API サービス エージェント拡張機能(roles/composer.ServiceAgentV2Ext)ロールを Composer サービス エージェント アカウントに付与します。Cloud Composer は、このアカウントを使用して Google Cloud プロジェクトでオペレーションを実行します。
Pub/Sub トピックの作成
この例では、Pub/Sub トピックに push されたメッセージに応答して DAG をトリガーします。この例で使用する Pub/Sub トピックを作成します。
コンソール
Google Cloud コンソールで、[Pub/Sub トピック] ページに移動します。
[トピックを作成] をクリックします。
[トピック ID] フィールドに、トピックの ID として
dag-topic-triggerを入力します。他のオプションはデフォルトのままにします。
[トピックを作成] をクリックします。
gcloud
トピックを作成するには、Google Cloud CLI で gcloud pubsub topics create コマンドを実行します。
gcloud pubsub topics create dag-topic-trigger
Terraform
Terraform スクリプトに次のリソース定義を追加します。
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
<PROJECT_ID> は、プロジェクトのプロジェクト ID に置き換えます。例: example-project。
DAG をアップロードする
DAG を環境にアップロードします。
- 次の DAG ファイルをローカル コンピュータに保存します。
<PROJECT_ID>は、プロジェクトのプロジェクト ID に置き換えます。例:example-project。- 編集済み DAG ファイルをご利用の環境にアップロードします。
サンプルコードには、trigger_dag と target_dag の 2 つの DAG が含まれています。
trigger_dag DAG は Pub/Sub トピックに登録し、Pub/Sub メッセージを pull し、Pub/Sub メッセージ データの DAG ID で指定された別の DAG をトリガーします。この例では、trigger_dag が target_dag DAG をトリガーし、タスクログにメッセージを出力します。
trigger_dag DAG には、次のタスクが含まれています。
subscribe_task: Pub/Sub トピックへのサブスクライブpull_messages_operator:PubSubPullOperatorを使用して Pub/Sub メッセージ データを読み取ります。trigger_target_dag: Pub/Sub トピックから pull されたメッセージ内のデータに応じて、別の DAG(この例ではtarget_dag)をトリガーします。
target_dag DAG には、output_to_logs という 1 つのタスクのみが含まれています。このタスクは、1 秒の遅延でタスクログにメッセージを出力します。
Pub/Sub トピックにメッセージを公開する Cloud Run functions の関数をデプロイする
このセクションでは、Pub/Sub トピックにメッセージを公開する Cloud Run functions の関数をデプロイします。
Cloud Run functions の関数を作成し、その構成を指定する
コンソール
Google Cloud コンソールで、Cloud Run functions ページに移動します。
[関数を作成] をクリックします。
[環境] フィールドで [第 1 世代] を選択します。
[関数名] フィールドに、関数の名前(
pubsub-publisher)を入力します。[トリガー] フィールドで、[HTTP] を選択します。
[認証] セクションで、[未認証の呼び出しを許可] を選択します。このオプションは、未認証のユーザーに HTTP 関数を呼び出せる権限を付与します。
[保存] をクリックします。
[次へ] をクリックして、[コード] ステップに進みます。
Terraform
Terraform から関数のソースコードを管理する簡単な方法がないため、この手順では Google Cloud コンソールの使用を検討してください。
この例では、Cloud Storage バケットを作成し、このバケットにファイルを保存して、バケットからのファイルを Cloud Run functions の関数のソースとして使用することで、Cloud Run functions の関数をローカルの zip アーカイブ ファイルからアップロードする方法を示します。この方法を使用する場合、新しいアーカイブ ファイルを作成しても、Terraform は関数のソースコードを自動的に更新しません。関数コードを再アップロードするには、アーカイブのファイル名を変更します。
pubsub_publisher.pyファイルとrequirements.txtファイルをダウンロードします。pubsub_publisher.pyファイルで、<PROJECT_ID>をプロジェクトのプロジェクト ID に置き換えます。例:example-projectpbusub_publisner.pyファイルとrequirements.txtファイルを含むpubsub_function.zipという名前の ZIP アーカイブを作成します。- zip アーカイブを、Terraform スクリプトが保存されているディレクトリに保存します。
- Terraform スクリプトに次のリソース定義を追加し、
<PROJECT_ID>をプロジェクトのプロジェクト ID に置き換えます。
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Cloud Run functions 関数のコードのパラメータを指定する
コンソール
[コード] ステップの [ランタイム] フィールドで、関数が使用する言語ランタイムを選択します。この例では、[Python 3.10] を選択します。
[エントリ ポイント] フィールドに、「
pubsub_publisher」と入力します。これは、Cloud Run functions の関数の実行時に実行されるコードです。このフラグの値は、ソースコード内に存在する関数名または完全修飾クラス名である必要があります。
Terraform
このステップをスキップします。Cloud Run functions の関数のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。
Cloud Run functions の関数のコードをアップロードする
コンソール
[ソースコード] フィールドで、関数のソースコードの提供方法に適したオプションを選択します。このチュートリアルでは、Cloud Run functions の関数のインライン エディタを使用して関数コードを追加します。または、ZIP ファイルをアップロードするか、Cloud Source Repositories を使用することもできます。
- 次のコードサンプルを main.py ファイルに配置します。
<PROJECT_ID>は、プロジェクトのプロジェクト ID に置き換えます。例:example-project
Terraform
このステップをスキップします。Cloud Run functions の関数のパラメータは、すでに google_cloudfunctions_function リソースで定義されています。
Cloud Run functions の関数の依存関係を指定する
コンソール
requirements.txt メタデータ ファイルで関数の依存関係を指定します。
関数をデプロイすると、Cloud Run functions は requirements.txt ファイル内で宣言されている依存関係を、パッケージごとに 1 行ずつダウンロードしてインストールします。このファイルは、関数コードを含む main.py ファイルと同じディレクトリに配置する必要があります。詳細については、pip ドキュメントの要件ファイルをご覧ください。
Terraform
このステップをスキップします。Cloud Run functions の関数の依存関係は、pubsub_function.zip アーカイブの requirements.txt ファイルで定義されます。
Cloud Run functions の関数をデプロイする
コンソール
[デプロイ] をクリックします。デプロイが正常に完了すると、Google Cloud コンソールの [Cloud Run functions] ページに関数と緑色のチェックマークが表示されます。
Cloud Run functions の関数を実行するサービス アカウントには、プロジェクト内に Pub/Sub にアクセスするための十分な権限があることを確認してください。
Terraform
Terraform を初期化します。
terraform init構成を確認して、Terraform が作成または更新するリソースが想定どおりであることを確認します。
terraform plan次のコマンドを実行して、構成が有効かどうかを確認します。
terraform validate次のコマンドを実行し、プロンプトで「yes」と入力して、Terraform 構成を適用します。
terraform apply
Terraform に「Apply complete!」というメッセージが表示されるまで待ちます。
Google Cloud コンソールの UI でリソースに移動して、Terraform によって作成または更新されたことを確認します。
Cloud Run functions の関数をテストする
関数が Pub/Sub トピックでメッセージをパブリッシュし、サンプルの DAG が意図したとおりに機能することを確認するには:
DAG がアクティブであることを確認します。
Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dagという名前の DAG とtarget_dagという名前の DAG の [State] 列の値を確認します。両方の DAG がActive状態である必要があります。
テスト用の Pub/Sub メッセージを push します。環境変数にキーを保存するには、Cloud Shell で次のように入力します。
Google Cloud コンソールで、[関数] ページに移動します。
関数の名前をクリックします。
pubsub-publisher[テスト] タブに移動します。
[トリガー イベントの構成] セクションで、次の JSON の Key-Value を入力します。
{"message": "target_dag"}このメッセージは後でテスト DAG をトリガーするため、Key-Value ペアを変更しないでください。[テストコマンド] セクションで、[Cloud Shell でテストする] をクリックします。
[Cloud Shell ターミナル] で、コマンドが自動的に表示されるまで待ちます。
Enterキーを押して、このコマンドを実行します。[Cloud Shell の承認] メッセージが表示されたら、[承認] をクリックします。
メッセージの内容が Pub/Sub メッセージに対応していることを確認します。この例では、出力メッセージは、関数からのレスポンスとして
Message b'target_dag' with message_length 10 published toで始まる必要があります。
target_dagがトリガーされたことを確認します。trigger_dagの新しい DAG 実行が完了するまで、少なくとも 1 分間待ちます。Google Cloud コンソールで、[環境] ページに移動します。
環境のリストで、ご利用の環境の名前をクリックします。[環境の詳細] ページが開きます。
[DAG] タブに移動します。
trigger_dagをクリックすると DAG の詳細ページに移動します。[実行] タブに、trigger_dagDAG の DAG 実行のリストが表示されます。この DAG は 1 分ごとに実行され、関数から送信されたすべての Pub/Sub メッセージを処理します。メッセージが送信されなかった場合、DAG 実行ログで
trigger_targetタスクはSkippedとしてマークされます。DAG がトリガーされた場合、trigger_targetタスクはSuccessとしてマークされます。最近の DAG 実行をいくつか調べて、3 つのタスク(
subscribe_task、pull_messages_operator、trigger_target)すべてがSuccessステータスになっている DAG 実行を見つけます。[DAG] タブに戻り、
target_dagDAG の [成功した実行] 列に成功した実行が 1 つリストされていることを確認します。
概要
このチュートリアルでは、Cloud Run functions を使用して Pub/Sub トピックでメッセージをパブリッシュし、Pub/Sub トピックにサブスクライブして Pub/Sub メッセージを pull し、メッセージ データの DAG ID で指定された別の DAG をトリガーする DAG をデプロイする方法を学びました。
また、Pub/Sub サブスクリプションの作成と管理、および DAG のトリガーには、このチュートリアルの範囲外である別の方法も存在します。たとえば、指定されたイベントが発生したときに、Cloud Run functions を使用して Airflow DAG をトリガーできます。チュートリアルを参照し、他のGoogle Cloud 機能を試します。
クリーンアップ
このチュートリアルで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトの削除
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
リソースを個別に削除する
複数のチュートリアルとクイックスタートを実施する予定がある場合は、プロジェクトを再利用すると、プロジェクトの割り当て上限を超えないようにできます。
コンソール
- Cloud Composer 環境を削除します。この手順で環境のバケットも削除します。
- Pub/Sub トピックを削除します、
dag-topic-trigger。 Cloud Run functions の関数を削除する
Google Cloud コンソールで、Cloud Run functions に移動します。
削除する関数のチェックボックスをクリックします、
pubsub-publisher。[削除] をクリックして、手順どおりに実行します。
Terraform
- Terraform スクリプトに、プロジェクトで依然として必要とされるリソースのエントリが含まれていないことを確認します。たとえば、一部の API を有効にしたまま、IAM 権限をまだ割り当てておくことができます(Terraform スクリプトにこのような定義を追加した場合)。
terraform destroyを実行します。- 環境のバケットを手動で削除します。Cloud Composer では自動的に削除されません。これは、 Google Cloud コンソールまたは Google Cloud CLI から行うことができます。
次のステップ
- DAG のテスト
- HTTP 関数のテスト
- Cloud Run 関数をデプロイする
- その他の Google Cloud 機能を試す。チュートリアルをご覧ください。