データ エンジニアリング パイプラインを構築する

このガイドでは、Antigravity 用の Google Cloud Data Agent Kit 拡張機能でオーケストレーション パイプラインを作成してデプロイする方法について説明します。

このパイプラインの例では、Managed Service for Apache Spark で PySpark スクリプトを実行します。

Antigravity からオーケストレーション パイプラインをローカル バージョンとしてデプロイするか、main ブランチへの変更をマージするときなど、GitHub アクションを介してデプロイできます。このドキュメントでは、オーケストレーション パイプラインのローカル バージョンをデプロイする方法について説明します。

始める前に

始める前に、次のことを行ってください。

  1. Antigravity 用の Data Agent Kit 拡張機能をインストールします
  2. 設定を構成します
  3. GitHub リポジトリを Antigravity ワークスペースに追加して、オーケストレーション パイプラインと、スクリプトなどのアセットを保存します。

必要な IAM ロールを確認する

プロジェクトでリソースを作成し、オーケストレーション パイプラインをデプロイして実行する権限を取得するには、必要なロールを付与するよう管理者に依頼してください。

Managed Service for Apache Airflow 環境を作成して管理し、関連付けられたバケット内のオブジェクトを管理するには、次のロールが必要です。これらのユーザーロールの詳細については、Managed Service for Apache Airflow のドキュメントのユーザーにロールを付与するをご覧ください。

  • 環境とストレージ オブジェクトの管理者 (composer.environmentAndStorageObjectAdmin)
  • サービス アカウント ユーザーiam.serviceAccountUser

BigQuery と Cloud Storage のリソースを操作するには、次のロールが必要です。

  • BigQuery データ編集者roles/bigquery.dataEditor
  • Storage オブジェクト管理者roles/storage.objectAdmin

アクセスするリソースによっては、拡張機能を使用してオーケストレーション パイプラインを操作できるロールに加えて、追加のロールが必要になる場合があります。

サービス アカウントを作成して IAM ロールを付与する

Managed Airflow Gen 3 環境に一意のサービス アカウントを使用します。サービス アカウントは、Managed Airflow Gen 3 環境を作成し、デプロイするすべてのオーケストレーション パイプラインを実行します。

管理者に次の手順を完了するよう依頼してください。

  1. IAM ドキュメントの説明に沿って、サービス アカウントを作成します
  2. サービス アカウントに Composer ワーカーcomposer.worker)ロールを付与します。ほとんどの場合、このロールには必要な権限が付与されています。

ベスト プラクティスとして、Google Cloud プロジェクト内の他のリソースにアクセスする必要がある場合は、オーケストレーション パイプラインのオペレーションに必要な場合にのみ、このサービス アカウントに追加の権限を付与します。

オーケストレーション パイプラインの Google Cloud リソースを作成する

このステップでは、オーケストレーション パイプラインの Google Cloud リソースを作成します。

Managed Airflow Gen 3 環境を作成する

次の構成で Managed Airflow Gen 3 環境を作成します。

  • 環境名: 後でオーケストレーション パイプラインの構成に使用する名前を入力します。例: example-pipeline-scheduler
  • ロケーション: ロケーションを選択します。このガイドのすべてのリソースを同じロケーションに作成することをおすすめします。例: us-central1
  • サービス アカウント: この環境用に作成したサービス アカウントを選択します。

次の Google Cloud CLI コマンドの例は、構文を示しています。

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

スケジューラ構成に環境パラメータを追加する

オーケストレーション パイプラインを実行する Managed Airflow 環境の接続の詳細を指定します。

Google Cloud Data Agent Kit 設定エディタを使用して、作成した環境の構成パラメータを追加します。

  1. アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
  2. [設定] を展開し、[設定] をクリックします。
  3. [スケジューラ] を選択します。
  4. 先ほど作成した Managed Airflow Gen 3 環境のパラメータを入力します。
    • プロジェクト ID: 環境が配置されているプロジェクトの名前。例: example-project
    • リージョン: 環境が配置されているリージョン。us-central1
    • 環境: 環境の名前。example-pipeline-scheduler
  5. [保存] をクリックします。

パイプライン アーティファクト用のバケットを作成する

Managed Airflow 環境と同じプロジェクトに Cloud Storage バケットを作成し、example-pipelines-bucket と同様の名前を付けます。このバケットは、Managed Service for Apache Spark ジョブを保存するために必要です。

一部のパイプライン アクション(結果を Cloud Storage バケットに出力するなど)。

BigQuery で新しいデータセットとテーブルを作成する

このガイドでは、BigQuery テーブルにデータを書き込むパイプラインについて説明します。プロジェクトに次の BigQuery リソースを作成します。

  1. wordcount_dataset という名前の新しいデータセットを作成します。
  2. wordcount_output という名前の 新しい BigQuery テーブルを作成します。

パイプライン アセットを追加する

このガイドでは、PySpark を使用して一般的なデータ エンジニアリング タスク(ETL: 抽出、変換、読み込み)を実行する方法について説明します。BigQuery からデータを読み取り、データを変換(単語数)して、BigQuery に読み込みます。

非エージェント型

次のファイルをリポジトリの /scripts フォルダに追加します。このスクリプトを Managed Service for Apache Spark で実行するパイプライン アクションは、後で追加します。

wordcount.py ファイルの例:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

次のように置き換えます。

  • ARTIFACTS_BUCKET_NAME: 前に作成した Cloud Storage バケットの名前。例: example-pipelines-bucket
  • PROJECT_ID: 環境が存在するプロジェクトの名前。例: example-project

エージェント

エージェントに、リポジトリの /scripts フォルダにサンプル PySpark スクリプトを生成するように指示します。後で、Managed Service for Apache Spark でこのスクリプトを実行するパイプライン アクションを追加します。

次のようなプロンプトを入力します。

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

リポジトリでオーケストレーション パイプラインを初期化する

オーケストレーション パイプラインを初期化すると、Antigravity 用の Data Agent Kit 拡張機能によって、次のものを含むスキャフォールディングが作成されます。

  • オーケストレーション パイプラインの YAML ファイル: スケジュールは含まれているが、アクションが定義されていないパイプライン定義の例。
  • deployment.yaml: パイプラインのデプロイ方法を定義するパイプライン デプロイ構成の例。このファイルは、Managed Airflow 環境、アーティファクト バケット、パイプライン アクションで使用されるその他のリソースに必要な構成を示しています。
  • .github/workflows/deploy.yaml: GitHub リポジトリの main ブランチに変更をマージしたときにパイプラインをデプロイする GitHub アクションを設定します。
  • .github/workflows/validate.yaml: パイプラインのデプロイ後に検証する GitHub アクションを設定します。

このドキュメントの後の手順では、Antigravity 用の Data Agent Kit 拡張機能を使用してこれらの定義を拡張し、オーケストレーション パイプラインをローカルで作成してデプロイします。

非エージェント型

オーケストレーション パイプラインを初期化する手順は次のとおりです。

  1. アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
  2. [データ エンジニアリング] を開いて、[オーケストレーション パイプラインを初期化] をクリックします。
  3. 新しいオーケストレーション パイプラインのパラメータを入力します。
  4. パイプライン ID: パイプラインの ID を入力します。例: example-pipeline
  5. Google Cloud プロジェクト ID: 環境が存在するプロジェクトの名前。例: example-project
  6. リージョン: 環境が存在するリージョン。us-central1
  7. 環境 ID: 開発に使用する環境の名前。例: dev/staging
  8. Scheduler Managed Service for Apache Airflow Environment: パイプラインをオーケストレートする環境の名前。このドキュメントでは、このパラメータで同じ環境を指定します。

  9. アーティファクト バケット: パイプライン アーティファクトに使用されるバケットの名前(gs:// 接頭辞は付けない)。例: example-pipelines-bucket

  10. [次へ] をクリックします。

  11. [Initialize] をクリックします。

  12. パイプラインを初期化するワークスペースを指定します。

エージェント

エージェントに、リポジトリのオーケストレーション パイプラインのスキャフォールディングを作成するように依頼します。

次のようなプロンプトを入力します。

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

リポジトリでパイプラインを初期化した後、新しいスキャフォールディングによって行った構成の変更が上書きされるため、再度初期化することはできません。新しいパイプラインを追加するには、プロジェクトに新しいパイプライン定義ファイルを作成し、デプロイ構成に追加します。

パイプラインに新しいタスクを追加する

初期パイプライン構成にはアクションがないため、PySpark スクリプトを実行するアクションを追加します。

非エージェント型

パイプラインを編集する手順は次のとおりです。

  1. アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
  2. [データ エンジニアリング]、[Orchestration Pipelines] の順に展開します。
  3. [example-pipeline.yaml] を選択します。選択したパイプラインのパイプライン エディタが開きます。
  4. 省略可: [スケジュール トリガー] ノードを選択します。cron 式とスケジュールの開始時刻と終了時刻を指定して、パイプラインのスケジュールを調整できます。新しく初期化されたパイプラインのデフォルトのスケジュールは 0 2 * * * で、毎日午前 2 時に実行されます。
  1. 新しいタスクを追加します。このガイドでは、先ほど追加した PySpark スクリプトを実行する PySpark タスクを追加します。

    1. [最初のアクティビティを追加] をクリックして、新しいタスクノードを追加します。
    2. [PySpark スクリプトを実行] と script/wordcount.py ファイルを選択します。

    [PySpark スクリプトを実行] パネルが開きます。

    1. [Spark クラスタモード] で、[サーバーレス Spark] を選択します。
    2. [ロケーション] で、環境が存在するロケーションを指定します。例: us-central1
    3. [保存] をクリックします。

エージェント

次のプロンプトを実行します。

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

パイプラインのローカル バージョンをデプロイする

パイプラインのローカル バージョンをデプロイして、正しく構成されていることを確認します。

オーケストレーション パイプラインのローカル バージョンをデプロイすると、Antigravity 用 Data Agent Kit 拡張機能は、パイプライン バンドルのローカル バージョンを Managed Airflow 環境にアップロードして実行します。ローカル デプロイは、開発環境で作業する場合に使用することを目的としています。

deploy コマンドは、一時停止されていないスケジュールをデプロイします。これを防ぐには、[パイプラインの管理] ペインでスケジュールを手動で一時停止します。パイプライン YAML ファイルを編集して、triggers: - schedule ブロックをコメントアウトまたは削除することもできます。

非エージェント型

サンプル オーケストレーション パイプラインのローカル バージョンをデプロイする手順は次のとおりです。

  1. アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
  2. [データ エンジニアリング]、[Orchestration Pipelines] の順に開きます。
  3. [example-pipeline.yaml] を選択します。選択したパイプラインのパイプライン エディタが開きます。
  4. [パイプラインを実行] を選択し、前に作成した開発環境またはステージング環境を選択します。

エージェント

次のプロンプトを実行します。

Deploy my pipeline

パイプラインの実行をモニタリングし、実行ログを確認する

パイプラインをデプロイすると、詳細情報、パイプライン実行の履歴、パイプライン実行ログを確認できます。

  1. アクティビティ バーの Google Cloud Data Agent Kit アイコンをクリックします。
  2. [データ エンジニアリング] を開いて、[パイプライン管理] を選択します。
  3. パイプラインの名前(example-pipeline)をクリックして、実行履歴を表示します。特定の日付の実行リストには、個々のパイプライン実行と、各パイプライン実行内の個々のアクションの内訳が表示されます。
  4. タスク ID をクリックして、タスク実行ログを表示します。この例の PySpark スクリプトは Managed Service for Apache Spark で実行されたため、タスクログには Batch ログへのリンクが含まれます。

パイプラインの障害のトラブルシューティングと修正

パイプラインが失敗すると、[パイプライン管理] ペインに [診断] ボタンが表示されます。

エージェント

[診断] ボタンをクリックすると、エージェントはパイプラインの障害のトラブルシューティングを行うためのプロンプトを生成します。プロンプトがクリップボードにコピーされるか、新しいチャット セッションで開きます。

エージェントは、ログの収集、デプロイされたコードとワークスペースの相互チェック、根本原因分析(RCA)の生成に重点を置き、パイプラインのトラブルシューティングに特化したスキルを使用します。

RCA を受け取った後の次のステップは次のとおりです。

  • 現在のワークスペースで根本原因分析を適用します。
  • エージェントに新しいブランチを作成して、そこに変更を適用するよう依頼します。
  • RCA の詳細を記載した Cloud カスタマーケア チケットを開きます。

拡張機能に関する問題のトラブルシューティングについては、Antigravity 用 Data Agent Kit 拡張機能のトラブルシューティングをご覧ください。

次のステップ