本指南說明如何在 Antigravity IDE 的 Google Cloud Data Agent Kit 擴充功能中,建立及部署協調管道。
範例管道會在 Managed Service for Apache Spark 中執行 PySpark 指令碼。
您可以從 Antigravity IDE 部署編排管道,做為本機版本,或透過 GitHub 動作 (例如將變更合併至 main 分支版本時) 部署。本文說明如何部署編排管道的本機版本。
事前準備
開始之前,請先完成下列操作:
- 為 Antigravity IDE 安裝 Data Agent Kit 擴充功能。
- 調整設定。
- 在 Antigravity IDE 工作區中新增 GitHub 存放區,儲存協調流程管道和指令碼等資產。
查看必要的 IAM 角色
如要取得在專案中建立資源、部署及執行 Orchestration Pipelines 的權限,請要求系統管理員授予您必要角色。
如要建立及管理 Managed Service for Apache Airflow 環境,並管理相關聯儲存空間中的物件,您需要下列角色。如要進一步瞭解這些使用者角色,請參閱 Managed Service for Apache Airflow 說明文件中的「授予使用者角色」。
- 環境與 Storage 物件管理員 (composer.environmentAndStorageObjectAdmin)
- 服務帳戶使用者
(
iam.serviceAccountUser)
如要使用 BigQuery 和 Cloud Storage 資源,您需要下列角色。
- BigQuery 資料編輯者 (
roles/bigquery.dataEditor) - Storage 物件管理員 (
roles/storage.objectAdmin)
視您打算存取的資源而定,您可能需要額外角色,才能使用擴充功能及處理協調流程管道。
建立服務帳戶並授予 IAM 角色
為 Managed Airflow 第 3 代環境使用專屬服務帳戶。服務帳戶會建立 Managed Airflow 第 3 代環境,並執行您部署的所有自動化調度管理管道。
請管理員完成下列步驟:
- 按照 IAM 說明文件中的說明建立服務帳戶。
- 將 Composer Worker (
composer.worker) 角色授予服務帳戶。這個角色在大多數情況下都會提供必要權限。
最佳做法是,如果需要存取Google Cloud 專案中的其他資源,請僅在編排管道作業需要時,才授予這項服務帳戶額外權限。
為自動化調度管理管道建立 Google Cloud 資源
在這個步驟中,請為協調管道建立 Google Cloud 資源。
建立代管 Airflow 第 3 代環境
建立 Managed Airflow 第 3 代環境,並使用下列設定:
- 環境名稱:輸入稍後用於設定協調管道的名稱。例如:
example-pipeline-scheduler。 - 位置:選取位置。建議您在本指南中建立的所有資源都位於相同位置。例如:
us-central1。 - 服務帳戶:選取您為這個環境建立的服務帳戶。
以下 Google Cloud CLI 指令範例會示範語法:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
在排程器設定中新增環境參數
請提供 Managed Airflow 環境的連線詳細資料,該環境將執行協調流程管道。
使用 Google Cloud Data Agent Kit 設定編輯器,新增您建立的環境設定參數:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「設定」,然後點選「設定」。
- 選取「排程器」。
- 輸入您先前建立的 Managed Airflow 第 3 代環境參數:
- 專案 ID:環境所在的專案名稱。範例:
example-project。 - 區域:環境所在的區域。例如:
us-central1。 - 環境:環境名稱。例如:
example-pipeline-scheduler。
- 專案 ID:環境所在的專案名稱。範例:
- 按一下 [儲存]。
建立管道構件的 bucket
在與 Managed Airflow 環境相同的專案中建立 Cloud Storage bucket,並將其命名為類似 example-pipelines-bucket 的名稱。這個值區必須用於儲存 Managed Service for Apache Spark 工作。
部分管道動作,例如將結果輸出至 Cloud Storage bucket。
在 BigQuery 中建立新的資料集和資料表
本指南示範如何建立管道,將資料寫入 BigQuery 資料表。在專案中建立下列 BigQuery 資源:
新增管道資產
本指南將示範如何使用 PySpark 執行常見的資料工程工作 (ETL:擷取、轉換、載入),從 BigQuery 讀取資料、轉換資料 (字數統計),然後將資料載入回 BigQuery。
非代理
將下列檔案新增至存放區的 /scripts 資料夾。稍後,您會在 Managed Service for Apache Spark 中新增管道動作,以執行這項指令碼。
範例 wordcount.py 檔案:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
更改下列內容:
- ARTIFACTS_BUCKET_NAME:您稍早建立的 Cloud Storage bucket 名稱。範例:
example-pipelines-bucket。 - PROJECT_ID:環境所在的專案名稱。範例:
example-project。
代理
提示 Agent 在存放區的 /scripts 資料夾中生成 PySpark 指令碼範例。稍後,您會在 Managed Service for Apache Spark 中新增執行這項指令碼的管道動作。
輸入類似下列的提示:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
在存放區中初始化自動化調度管理管道
初始化協調管道時,Antigravity IDE 的 Data Agent Kit 擴充功能會建立包含下列項目的支架:
- 協調管道 YAML 檔案:包含排程但未定義動作的管道定義範例。
deployment.yaml:管道部署設定範例,定義管道的部署方式。這個檔案會說明 Managed Airflow 環境、構件值區,以及管道動作使用的任何其他資源,所需的設定。.github/workflows/deploy.yaml:設定 GitHub 動作,在您將變更合併至 GitHub 存放區的main分支版本時,部署管道。.github/workflows/validate.yaml:設定 GitHub 動作,在管道部署後驗證管道。
在本文後續步驟中,您將使用 Antigravity IDE 的 Data Agent Kit 擴充功能擴充這些定義,在本機建立及部署協調程序管道。
非代理
如要初始化自動化調度管理管道,請執行下列操作:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「資料工程」,然後按一下「Initialize orchestration pipeline」(初始化協調管道)。
- 輸入新自動調度管道的參數:
- 管道 ID:輸入管道的 ID。範例:
example-pipeline。 - Google Cloud 專案 ID:環境所在的專案名稱。範例:
example-project。 - 區域:環境所在的區域。例如:
us-central1。 - 環境 ID:您要用來開發的環境名稱。
範例:
dev/staging。 Scheduler Managed Service for Apache Airflow 環境:您要自動調度管理管道的環境名稱。對於這份文件,請在這個參數中指定相同環境。
構件 Bucket:用於管線構件的 Bucket 名稱,不含
gs://前置字元。範例:example-pipelines-bucket。點選「下一步」。
按一下「初始化」。
指定要初始化管道的工作區。
代理
要求 Agent 為存放區的協調管道建立架構。
輸入類似下列的提示:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
在存放區中初始化管道後,您就無法再次執行這項操作,因為新的支架會覆寫您所做的任何設定變更。如要新增 pipeline,請在專案中建立新的 pipeline 定義檔案,然後將這些檔案新增至部署設定。
將新工作新增至管道
由於初始管道設定沒有任何動作,因此您要新增一個動作,以便執行 PySpark 指令碼。
非代理
如要編輯管道,請按照下列步驟操作:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 依序展開「資料工程」,然後展開「Orchestration Pipelines」。
- 選取「
example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。 - 選用:選取「Schedule trigger」(排程觸發條件) 節點。您可以指定類似 Cron 的運算式,以及排程的開始和結束時間,藉此調整管道的排程。新初始化的管道預設排程為
0 2 * * *,每天凌晨 2 點執行。
新增工作。在本指南中,您會新增 PySpark 工作,執行先前新增的 PySpark 指令碼:
- 按一下「新增第一個工作」,新增工作節點。
- 選取「執行 PySpark 指令碼」和
script/wordcount.py檔案。
「執行 PySpark 指令碼」面板隨即開啟。
- 在「Spark Cluster Mode」(Spark 叢集模式) 中,選取「Serverless Spark」(無伺服器 Spark)。
- 在「位置」中,指定環境所在位置。
範例:
us-central1。 - 按一下 [儲存]。
代理
執行下列提示:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
部署管道的本機版本
部署管道的本機版本,確認設定正確無誤。
部署自動化調度管理管道的本機版本時,Antigravity IDE 的 Data Agent Kit 擴充功能會將管道套件的本機版本上傳至 Managed Airflow 環境並執行。在本機部署時,請務必在開發環境中作業。
部署指令會部署未暫停的排程。為避免這種情況,您可以在「管道管理」窗格中手動暫停排程。您也可以編輯管道 YAML 檔案,註解或移除 triggers: - schedule 區塊。
非代理
如要部署範例協調管道的本機版本,請執行下列步驟:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 依序展開「資料工程」和「Orchestration Pipelines」。
- 選取「
example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。 - 選取「執行管道」,然後選取先前建立的開發或暫存環境。
代理
執行下列提示:
Deploy my pipeline
監控管道執行作業並檢查執行記錄
管道部署完成後,您可以查看詳細資訊、管道執行記錄和管道執行記錄:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「資料工程」,然後選取「管道管理」。
- 按一下管道名稱 (
example-pipeline) 即可查看執行記錄。在特定日期的執行清單中,您可以查看個別管道執行作業,以及每個管道執行作業中個別動作的明細。 - 按一下工作 ID 即可查看工作執行記錄。由於範例 PySpark 指令碼是在 Managed Service for Apache Spark 中執行,因此工作記錄會提供批次記錄的連結。
排解及修正管道失敗問題
如果管道失敗,「管道管理」窗格中會顯示「診斷」按鈕。
代理
按一下「診斷」按鈕後,代理程式會產生提示,協助排解管道故障問題。提示會複製到剪貼簿,或在新即時通訊工作階段中開啟。
代理程式會運用專業技能排解管道問題,著重於收集記錄、交叉檢查已部署的程式碼和工作區,以及產生根本原因分析 (RCA)。
收到 RCA 後,可能的後續步驟如下:
- 在目前工作區套用根本原因分析。
- 請代理建立新分支,並在該分支套用變更。
- 開啟 Cloud Customer Care 服務單,並附上 RCA 詳細資料。
如需擴充功能問題的疑難排解說明,請參閱「疑難排解」。