このガイドでは、Visual Studio Code 用の Google Cloud Data Agent Kit 拡張機能でオーケストレーション パイプラインを作成してデプロイする方法について説明します。
サンプル パイプラインは、Managed Service for Apache Spark で PySpark スクリプトを実行します。
オーケストレーション パイプラインは、ローカル バージョンとして VS Code からデプロイすることも、変更を main ブランチにマージするときなど、GitHub
アクションを介してデプロイすることもできます。このドキュメントでは、オーケストレーション パイプラインのローカル
バージョンをデプロイする方法について説明します。
始める前に
始める前に、次の操作を行います。
- VS Code 用の Data Agent Kit 拡張機能をインストールします。
- 設定を構成します。
- オーケストレーション パイプラインやスクリプトなどのアセットを保存するために、GitHub リポジトリを VS Code ワークスペースに追加します。
必要な IAM ロールを確認する
プロジェクトでリソースを作成し、オーケストレーション パイプラインをデプロイして実行する権限を取得するには、必要なロールを付与するよう管理者に依頼してください。
Managed Service for Apache Airflow 環境を作成して管理し、関連するバケット内のオブジェクトを管理するには、次のロールが必要です。これらのユーザーロールの詳細については、Managed Service for Apache Airflow ドキュメントのユーザーにロールを付与するをご覧ください。
- 環境と Storage オブジェクトの管理者 (composer.environmentAndStorageObjectAdmin)
- サービス アカウント ユーザー (
iam.serviceAccountUser)
BigQuery リソースと Cloud Storage リソースを操作するには、次のロールが必要です。
- BigQuery データ編集者 (
roles/bigquery.dataEditor) - Storage オブジェクト管理者 (
roles/storage.objectAdmin)
アクセスするリソースによっては、拡張機能を使用してオーケストレーション パイプラインを操作できるロールに加えて、追加のロールが必要になる場合があります。
サービス アカウントを作成して IAM ロールを付与する
Managed Airflow Gen 3 環境には一意のサービス アカウントを使用します。 このサービス アカウントは、Managed Airflow Gen 3 環境を作成し、デプロイするすべてのオーケストレーション パイプラインを実行します。
管理者に次の手順を完了するよう依頼してください。
- IAM ドキュメントの説明に沿って、サービス アカウントを作成します。
- サービス アカウントにComposer ワーカー (
composer.worker)ロールを付与します。このロールは、ほとんどの場合に必要な権限を提供します。
ベスト プラクティスとして、プロジェクト内の他のリソースにアクセスする必要がある場合は、オーケストレーション パイプラインの運用に必要な場合にのみ、このサービス アカウントに追加の権限を付与してください。Google Cloud
オーケストレーション パイプラインのリソースを作成する Google Cloud
このステップでは、オーケストレーション パイプラインのリソースを作成します。 Google Cloud
Managed Airflow Gen 3 環境を作成する
次の構成で Managed Airflow Gen 3 環境を作成します。
- 環境名: 後でオーケストレーション パイプラインの構成に使用する名前を入力します。例:
example-pipeline-scheduler。 - ロケーション: ロケーションを選択します。このガイドのすべてのリソースを同じロケーションに作成することをおすすめします。例:
us-central1。 - サービス アカウント: この 環境用に作成したサービス アカウントを選択します。
次の Google Cloud CLI コマンドの例は、構文を示しています。
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
スケジューラ構成に環境パラメータを追加する
オーケストレーション パイプラインを実行する Managed Airflow 環境の接続の詳細を指定します。
Google Cloud Data Agent Kit Settings エディタを使用して作成した環境の構成パラメータを追加します。
- アクティビティ バーの [Google Cloud Data Agent Kit] アイコンをクリックします。
- [設定] を開いて、[設定] をクリックします。
- [スケジューラ] を選択します。
- 以前に作成した Managed Airflow Gen 3 環境のパラメータを入力します。
- プロジェクト ID: 環境が配置されているプロジェクトの名前。
例:
example-project。 - リージョン: 環境が配置されているリージョン。例:
us-central1。 - 環境: 環境の名前。例:
example-pipeline-scheduler。
- プロジェクト ID: 環境が配置されているプロジェクトの名前。
例:
- [保存] をクリックします。
パイプライン アーティファクトのバケットを作成する
Cloud Storage バケットを作成し、
Managed Airflow 環境と同じプロジェクトに配置して、
example-pipelines-bucket のような名前を付けます。このバケットは、Managed Service for Apache Spark ジョブを保存するために必要です。
一部のパイプライン アクション(結果を Cloud Storage バケットに出力するなど)。
BigQuery で新しいデータセットとテーブルを作成する
このガイドでは、データを BigQuery テーブルに書き込むパイプラインについて説明します。プロジェクトに次の BigQuery リソースを作成します。
- 新しいデータセットを作成します 名前は
wordcount_datasetです。 - 新しい BigQuery テーブルを
wordcount_outputという名前で作成します。
パイプライン アセットを追加する
このガイドでは、PySpark を使用して一般的なデータ エンジニアリング タスク(ETL: 抽出、変換、読み込み)を実行し、BigQuery から読み取り、データを変換(ワードカウント)して、BigQuery に読み込む方法について説明します。
エージェントを使用しない
次のファイルをリポジトリの /scripts フォルダに追加します。後で、このスクリプトを Managed Service for Apache Spark で実行するパイプライン アクションを追加します。
wordcount.py ファイルの例:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
次のように置き換えます。
- ARTIFACTS_BUCKET_NAME:前に作成した Cloud Storage バケットの名前
。例:
example-pipelines-bucket。 - PROJECT_ID: 環境が存在するプロジェクトの名前。例:
example-project。
エージェントを使用する
エージェントに、リポジトリの /scripts フォルダにサンプル PySpark スクリプトを生成するように指示します。後で、このスクリプトを Managed Service for Apache Spark で実行するパイプライン アクションを追加します。
次のようなプロンプトを入力します。
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
リポジトリでオーケストレーション パイプラインを初期化する
オーケストレーション パイプラインを初期化すると、VS Code 用の Data Agent Kit 拡張機能によって、次のようなスキャフォールディングが作成されます。
- オーケストレーション パイプライン YAML ファイル: スケジュールを含むが、アクションが定義されていないパイプライン定義の例。
deployment.yaml: パイプラインのデプロイ方法を定義するパイプライン デプロイ構成の例。このファイルは、Managed Airflow 環境、アーティファクト バケット、パイプライン アクションで使用されるその他のリソースに必要な構成を示しています。.github/workflows/deploy.yaml: GitHub リポジトリのmainブランチに変更をマージしたときにパイプラインをデプロイする GitHub アクションを設定します。.github/workflows/validate.yaml: パイプラインのデプロイ後に検証する GitHub アクションを設定します。
このドキュメントの後のステップでは、VS Code 用の Data Agent Kit 拡張機能を使用してこれらの定義を拡張し、オーケストレーション パイプラインをローカルで作成してデプロイします。
エージェントを使用しない
オーケストレーション パイプラインを初期化する手順は次のとおりです。
- アクティビティ バーの [Google Cloud Data Agent Kit] アイコンをクリックします。
- [Data Engineering] を開いて、[Initialize orchestration pipeline] をクリックします。
- 新しいオーケストレーション パイプラインのパラメータを入力します。
- パイプライン ID: パイプラインの ID を入力します。例:
example-pipeline。 - Google Cloud プロジェクト ID: 環境が
存在するプロジェクトの名前。例:
example-project。 - リージョン: 環境が存在するリージョン。例:
us-central1。 - 環境 ID: 開発に使用する環境の名前。
例:
dev/staging。 Scheduler Managed Service for Apache Airflow Environment: パイプラインをオーケストレートする環境の名前。このドキュメントでは、このパラメータに同じ環境を指定します。
アーティファクト バケット: パイプライン アーティファクトに使用されるバケットの名前(
gs://接頭辞なし)。例:example-pipelines-bucket。[次へ] をクリックします。
[Initialize] をクリックします。
パイプラインを初期化するワークスペースを指定します。
エージェントを使用する
エージェントに、リポジトリのオーケストレーション パイプラインのスキャフォールディングを作成するように指示します。
次のようなプロンプトを入力します。
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
リポジトリでパイプラインを初期化すると、新しいスキャフォールディングによって行った構成変更が上書きされるため、再度初期化することはできません。 新しいパイプラインを追加するには、プロジェクトに新しいパイプライン定義ファイルを作成して、デプロイ構成に追加します。
パイプラインに新しいタスクを追加する
初期パイプライン構成にはアクションがないため、PySpark スクリプトを実行するアクションを追加します。
エージェントを使用しない
パイプラインを編集する手順は次のとおりです。
- アクティビティ バーの [Google Cloud Data Agent Kit] アイコンをクリックします。
- [Data Engineering]、[Orchestration Pipelines] の順に開きます。
example-pipeline.yamlを選択します。選択したパイプラインのパイプライン エディタが開きます。- 省略可: [Schedule trigger] ノードを選択します。cron のような式とスケジュールの開始時刻と終了時刻を指定して、パイプラインのスケジュールを調整できます。新しく初期化されたパイプラインのデフォルト スケジュールは
0 2 * * *で、毎日午前 2 時に実行されます。
新しいタスクを追加します。このガイドでは、以前に追加した PySpark スクリプトを実行する PySpark タスクを追加します。
- [Add first task] をクリックして、新しいタスクノードを追加します。
- [Execute PySpark script] と
script/wordcount.pyファイルを選択します。
[Execute PySpark script] パネルが開きます。
- [Spark Cluster Mode] で [Serverless Spark] を選択します。
- [Location] で、環境が存在するロケーションを指定します。
例:
us-central1。 - [保存] をクリックします。
エージェントを使用する
次のプロンプトを実行します。
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
パイプラインのローカル バージョンをデプロイする
パイプラインのローカル バージョンをデプロイして、正しく構成されていることを確認します。
オーケストレーション パイプラインのローカル バージョンをデプロイすると、VS Code 用の Data Agent Kit 拡張機能によって、パイプライン バンドルのローカル バージョンが Managed Airflow 環境にアップロードされて実行されます。ローカル デプロイは、開発環境で作業する場合に使用することを目的としています。
deploy コマンドは、一時停止されていないスケジュールをデプロイします。これを防ぐには、[Pipelines Management]
ペインでスケジュールを手動で一時停止します。パイプライン YAML ファイルを編集して、triggers: - schedule
ブロックをコメントアウトまたは削除することもできます。
エージェントを使用しない
オーケストレーション パイプラインのローカル バージョンをデプロイする手順は次のとおりです。
- アクティビティ バーの [Google Cloud Data Agent Kit] アイコンをクリックします。
- [Data Engineering]、[Orchestration Pipelines] の順に開きます。
example-pipeline.yamlを選択します。選択したパイプラインのパイプライン エディタが開きます。- [Run pipeline] を選択し、以前に作成した開発環境またはステージング環境を選択します。
エージェントを使用する
次のプロンプトを実行します。
Deploy my pipeline
パイプラインの実行をモニタリングして実行ログを確認する
パイプラインをデプロイすると、詳細情報、パイプライン実行の履歴、パイプライン実行ログを確認できます。
- アクティビティ バーの [Google Cloud Data Agent Kit] アイコンをクリックします。
- [Data Engineering] を開いて、[Pipelines management] を選択します。
- パイプラインの名前(
example-pipeline)をクリックして、実行履歴を表示します。特定の日付の実行リストで、個々のパイプライン実行と、各パイプライン実行内の個々のアクションの内訳を確認できます。 - タスク ID をクリックして、タスク実行ログを表示します。サンプル PySpark スクリプトは Managed Service for Apache Spark で実行されたため、タスクログにはバッチログへのリンクが表示されます。
パイプラインの失敗をトラブルシューティングして修正する
パイプラインが失敗すると、[Pipelines management] ペインに [Diagnose] ボタンが表示されます。
エージェントを使用する
[Diagnose] ボタンをクリックすると、エージェントはパイプラインの失敗をトラブルシューティングするためのプロンプトを生成します。プロンプトはクリップボードにコピーされるか、新しいチャット セッションで開きます。
エージェントは、ログの収集、デプロイされたコードとワークスペースの相互チェック、根本原因分析(RCA)の生成に重点を置いて、特別なスキルを使用してパイプラインのトラブルシューティングを行います。
RCA を受け取った後の次のステップは次のとおりです。
- 現在のワークスペースで根本原因分析を適用します。
- エージェントに新しいブランチを作成して、変更を適用するように依頼します。
- RCA の詳細を含む Cloud カスタマーケア チケットを開きます。
拡張機能に関する問題のトラブルシューティングについては、 VS Code 用の Data Agent Kit 拡張機能のトラブルシューティングをご覧ください。