このガイドでは、Antigravity 用の Google Cloud Data Agent Kit 拡張機能でオーケストレーション パイプラインを作成してデプロイする方法について説明します。
このパイプラインの例では、Managed Service for Apache Spark で PySpark スクリプトを実行します。
Antigravity からオーケストレーション パイプラインをローカル バージョンとしてデプロイするか、main ブランチへの変更をマージするときなど、GitHub アクションを介してデプロイできます。このドキュメントでは、オーケストレーション パイプラインのローカル バージョンをデプロイする方法について説明します。
始める前に
始める前に、次のことを行ってください。
- Antigravity 用の Data Agent Kit 拡張機能をインストールします。
- 設定を構成します。
- GitHub リポジトリを Antigravity ワークスペースに追加して、オーケストレーション パイプラインと、スクリプトなどのアセットを保存します。
必要な IAM ロールを確認する
プロジェクトでリソースを作成し、オーケストレーション パイプラインをデプロイして実行する権限を取得するには、必要なロールを付与するよう管理者に依頼してください。
Managed Service for Apache Airflow 環境を作成して管理し、関連付けられたバケット内のオブジェクトを管理するには、次のロールが必要です。これらのユーザーロールの詳細については、Managed Service for Apache Airflow のドキュメントのユーザーにロールを付与するをご覧ください。
- 環境とストレージ オブジェクトの管理者 (composer.environmentAndStorageObjectAdmin)
- サービス アカウント ユーザー(
iam.serviceAccountUser)
BigQuery と Cloud Storage のリソースを操作するには、次のロールが必要です。
- BigQuery データ編集者(
roles/bigquery.dataEditor) - Storage オブジェクト管理者(
roles/storage.objectAdmin)
アクセスするリソースによっては、拡張機能を使用してオーケストレーション パイプラインを操作できるロールに加えて、追加のロールが必要になる場合があります。
サービス アカウントを作成して IAM ロールを付与する
Managed Airflow Gen 3 環境に一意のサービス アカウントを使用します。サービス アカウントは、Managed Airflow Gen 3 環境を作成し、デプロイするすべてのオーケストレーション パイプラインを実行します。
管理者に次の手順を完了するよう依頼してください。
- IAM ドキュメントの説明に沿って、サービス アカウントを作成します。
- サービス アカウントに Composer ワーカー(
composer.worker)ロールを付与します。ほとんどの場合、このロールには必要な権限が付与されています。
ベスト プラクティスとして、Google Cloud プロジェクト内の他のリソースにアクセスする必要がある場合は、オーケストレーション パイプラインのオペレーションに必要な場合にのみ、このサービス アカウントに追加の権限を付与します。
オーケストレーション パイプラインの Google Cloud リソースを作成する
このステップでは、オーケストレーション パイプラインの Google Cloud リソースを作成します。
Managed Airflow Gen 3 環境を作成する
次の構成で Managed Airflow Gen 3 環境を作成します。
- 環境名: 後でオーケストレーション パイプラインの構成に使用する名前を入力します。例:
example-pipeline-scheduler - ロケーション: ロケーションを選択します。このガイドのすべてのリソースを同じロケーションに作成することをおすすめします。例:
us-central1 - サービス アカウント: この環境用に作成したサービス アカウントを選択します。
次の Google Cloud CLI コマンドの例は、構文を示しています。
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
スケジューラ構成に環境パラメータを追加する
オーケストレーション パイプラインを実行する Managed Airflow 環境の接続の詳細を指定します。
Google Cloud Data Agent Kit 設定エディタを使用して、作成した環境の構成パラメータを追加します。
- アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
- [設定] を展開し、[設定] をクリックします。
- [スケジューラ] を選択します。
- 先ほど作成した Managed Airflow Gen 3 環境のパラメータを入力します。
- プロジェクト ID: 環境が配置されているプロジェクトの名前。例:
example-project - リージョン: 環境が配置されているリージョン。
us-central1 - 環境: 環境の名前。
example-pipeline-scheduler
- プロジェクト ID: 環境が配置されているプロジェクトの名前。例:
- [保存] をクリックします。
パイプライン アーティファクト用のバケットを作成する
Managed Airflow 環境と同じプロジェクトに Cloud Storage バケットを作成し、example-pipelines-bucket と同様の名前を付けます。このバケットは、Managed Service for Apache Spark ジョブを保存するために必要です。
一部のパイプライン アクション(結果を Cloud Storage バケットに出力するなど)。
BigQuery で新しいデータセットとテーブルを作成する
このガイドでは、BigQuery テーブルにデータを書き込むパイプラインについて説明します。プロジェクトに次の BigQuery リソースを作成します。
wordcount_datasetという名前の新しいデータセットを作成します。wordcount_outputという名前の 新しい BigQuery テーブルを作成します。
パイプライン アセットを追加する
このガイドでは、PySpark を使用して一般的なデータ エンジニアリング タスク(ETL: 抽出、変換、読み込み)を実行する方法について説明します。BigQuery からデータを読み取り、データを変換(単語数)して、BigQuery に読み込みます。
非エージェント型
次のファイルをリポジトリの /scripts フォルダに追加します。このスクリプトを Managed Service for Apache Spark で実行するパイプライン アクションは、後で追加します。
wordcount.py ファイルの例:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
次のように置き換えます。
- ARTIFACTS_BUCKET_NAME: 前に作成した Cloud Storage バケットの名前。例:
example-pipelines-bucket - PROJECT_ID: 環境が存在するプロジェクトの名前。例:
example-project
エージェント
エージェントに、リポジトリの /scripts フォルダにサンプル PySpark スクリプトを生成するように指示します。後で、Managed Service for Apache Spark でこのスクリプトを実行するパイプライン アクションを追加します。
次のようなプロンプトを入力します。
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
リポジトリでオーケストレーション パイプラインを初期化する
オーケストレーション パイプラインを初期化すると、Antigravity 用の Data Agent Kit 拡張機能によって、次のものを含むスキャフォールディングが作成されます。
- オーケストレーション パイプラインの YAML ファイル: スケジュールは含まれているが、アクションが定義されていないパイプライン定義の例。
deployment.yaml: パイプラインのデプロイ方法を定義するパイプライン デプロイ構成の例。このファイルは、Managed Airflow 環境、アーティファクト バケット、パイプライン アクションで使用されるその他のリソースに必要な構成を示しています。.github/workflows/deploy.yaml: GitHub リポジトリのmainブランチに変更をマージしたときにパイプラインをデプロイする GitHub アクションを設定します。.github/workflows/validate.yaml: パイプラインのデプロイ後に検証する GitHub アクションを設定します。
このドキュメントの後の手順では、Antigravity 用の Data Agent Kit 拡張機能を使用してこれらの定義を拡張し、オーケストレーション パイプラインをローカルで作成してデプロイします。
非エージェント型
オーケストレーション パイプラインを初期化する手順は次のとおりです。
- アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
- [データ エンジニアリング] を開いて、[オーケストレーション パイプラインを初期化] をクリックします。
- 新しいオーケストレーション パイプラインのパラメータを入力します。
- パイプライン ID: パイプラインの ID を入力します。例:
example-pipeline - Google Cloud プロジェクト ID: 環境が存在するプロジェクトの名前。例:
example-project - リージョン: 環境が存在するリージョン。
us-central1 - 環境 ID: 開発に使用する環境の名前。例:
dev/staging Scheduler Managed Service for Apache Airflow Environment: パイプラインをオーケストレートする環境の名前。このドキュメントでは、このパラメータで同じ環境を指定します。
アーティファクト バケット: パイプライン アーティファクトに使用されるバケットの名前(
gs://接頭辞は付けない)。例:example-pipelines-bucket[次へ] をクリックします。
[Initialize] をクリックします。
パイプラインを初期化するワークスペースを指定します。
エージェント
エージェントに、リポジトリのオーケストレーション パイプラインのスキャフォールディングを作成するように依頼します。
次のようなプロンプトを入力します。
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
リポジトリでパイプラインを初期化した後、新しいスキャフォールディングによって行った構成の変更が上書きされるため、再度初期化することはできません。新しいパイプラインを追加するには、プロジェクトに新しいパイプライン定義ファイルを作成し、デプロイ構成に追加します。
パイプラインに新しいタスクを追加する
初期パイプライン構成にはアクションがないため、PySpark スクリプトを実行するアクションを追加します。
非エージェント型
パイプラインを編集する手順は次のとおりです。
- アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
- [データ エンジニアリング]、[Orchestration Pipelines] の順に展開します。
- [
example-pipeline.yaml] を選択します。選択したパイプラインのパイプライン エディタが開きます。 - 省略可: [スケジュール トリガー] ノードを選択します。cron 式とスケジュールの開始時刻と終了時刻を指定して、パイプラインのスケジュールを調整できます。新しく初期化されたパイプラインのデフォルトのスケジュールは
0 2 * * *で、毎日午前 2 時に実行されます。
新しいタスクを追加します。このガイドでは、先ほど追加した PySpark スクリプトを実行する PySpark タスクを追加します。
- [最初のアクティビティを追加] をクリックして、新しいタスクノードを追加します。
- [PySpark スクリプトを実行] と
script/wordcount.pyファイルを選択します。
[PySpark スクリプトを実行] パネルが開きます。
- [Spark クラスタモード] で、[サーバーレス Spark] を選択します。
- [ロケーション] で、環境が存在するロケーションを指定します。例:
us-central1 - [保存] をクリックします。
エージェント
次のプロンプトを実行します。
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
パイプラインのローカル バージョンをデプロイする
パイプラインのローカル バージョンをデプロイして、正しく構成されていることを確認します。
オーケストレーション パイプラインのローカル バージョンをデプロイすると、Antigravity 用 Data Agent Kit 拡張機能は、パイプライン バンドルのローカル バージョンを Managed Airflow 環境にアップロードして実行します。ローカル デプロイは、開発環境で作業する場合に使用することを目的としています。
deploy コマンドは、一時停止されていないスケジュールをデプロイします。これを防ぐには、[パイプラインの管理] ペインでスケジュールを手動で一時停止します。パイプライン YAML ファイルを編集して、triggers: - schedule ブロックをコメントアウトまたは削除することもできます。
非エージェント型
サンプル オーケストレーション パイプラインのローカル バージョンをデプロイする手順は次のとおりです。
- アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
- [データ エンジニアリング]、[Orchestration Pipelines] の順に開きます。
- [
example-pipeline.yaml] を選択します。選択したパイプラインのパイプライン エディタが開きます。 - [パイプラインを実行] を選択し、前に作成した開発環境またはステージング環境を選択します。
エージェント
次のプロンプトを実行します。
Deploy my pipeline
パイプラインの実行をモニタリングし、実行ログを確認する
パイプラインをデプロイすると、詳細情報、パイプライン実行の履歴、パイプライン実行ログを確認できます。
- アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
- [データ エンジニアリング] を開いて、[パイプライン管理] を選択します。
- パイプラインの名前(
example-pipeline)をクリックして、実行履歴を表示します。特定の日付の実行リストには、個々のパイプライン実行と、各パイプライン実行内の個々のアクションの内訳が表示されます。 - タスク ID をクリックして、タスク実行ログを表示します。この例の PySpark スクリプトは Managed Service for Apache Spark で実行されたため、タスクログには Batch ログへのリンクが含まれます。
パイプラインの障害のトラブルシューティングと修正
パイプラインが失敗すると、[パイプライン管理] ペインに [診断] ボタンが表示されます。
エージェント
[診断] ボタンをクリックすると、エージェントはパイプラインの障害のトラブルシューティングを行うためのプロンプトを生成します。プロンプトがクリップボードにコピーされるか、新しいチャット セッションで開きます。
エージェントは、ログの収集、デプロイされたコードとワークスペースの相互チェック、根本原因分析(RCA)の生成に重点を置き、パイプラインのトラブルシューティングに特化したスキルを使用します。
RCA を受け取った後の次のステップは次のとおりです。
- 現在のワークスペースで根本原因分析を適用します。
- エージェントに新しいブランチを作成して、そこに変更を適用するよう依頼します。
- RCA の詳細を記載した Cloud カスタマーケア チケットを開きます。
拡張機能に関する問題のトラブルシューティングについては、Antigravity 用 Data Agent Kit 拡張機能のトラブルシューティングをご覧ください。