建構資料工程管道

本指南說明如何在 Antigravity IDE 的 Google Cloud Data Agent Kit 擴充功能中,建立及部署協調管道。

範例管道會在 Managed Service for Apache Spark 中執行 PySpark 指令碼。

您可以從 Antigravity IDE 部署編排管道,做為本機版本,或透過 GitHub 動作 (例如將變更合併至 main 分支版本時) 部署。本文說明如何部署編排管道的本機版本。

事前準備

開始之前,請先完成下列操作:

  1. 為 Antigravity IDE 安裝 Data Agent Kit 擴充功能
  2. 調整設定
  3. 在 Antigravity IDE 工作區中新增 GitHub 存放區,儲存協調流程管道和指令碼等資產。

查看必要的 IAM 角色

如要取得在專案中建立資源、部署及執行 Orchestration Pipelines 的權限,請要求系統管理員授予您必要角色。

如要建立及管理 Managed Service for Apache Airflow 環境,並管理相關聯儲存空間中的物件,您需要下列角色。如要進一步瞭解這些使用者角色,請參閱 Managed Service for Apache Airflow 說明文件中的「授予使用者角色」。

  • 環境與 Storage 物件管理員 (composer.environmentAndStorageObjectAdmin)
  • 服務帳戶使用者 (iam.serviceAccountUser)

如要使用 BigQuery 和 Cloud Storage 資源,您需要下列角色。

  • BigQuery 資料編輯者 (roles/bigquery.dataEditor)
  • Storage 物件管理員 (roles/storage.objectAdmin)

視您打算存取的資源而定,您可能需要額外角色,才能使用擴充功能及處理協調流程管道。

建立服務帳戶並授予 IAM 角色

為 Managed Airflow 第 3 代環境使用專屬服務帳戶。服務帳戶會建立 Managed Airflow 第 3 代環境,並執行您部署的所有自動化調度管理管道。

請管理員完成下列步驟:

  1. 按照 IAM 說明文件中的說明建立服務帳戶
  2. Composer Worker (composer.worker) 角色授予服務帳戶。這個角色在大多數情況下都會提供必要權限。

最佳做法是,如果需要存取Google Cloud 專案中的其他資源,請僅在編排管道作業需要時,才授予這項服務帳戶額外權限。

為自動化調度管理管道建立 Google Cloud 資源

在這個步驟中,請為協調管道建立 Google Cloud 資源。

建立代管 Airflow 第 3 代環境

建立 Managed Airflow 第 3 代環境,並使用下列設定:

  • 環境名稱:輸入稍後用於設定協調管道的名稱。例如:example-pipeline-scheduler
  • 位置:選取位置。建議您在本指南中建立的所有資源都位於相同位置。例如:us-central1
  • 服務帳戶:選取您為這個環境建立的服務帳戶。

以下 Google Cloud CLI 指令範例會示範語法:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

在排程器設定中新增環境參數

請提供 Managed Airflow 環境的連線詳細資料,該環境將執行協調流程管道。

使用 Google Cloud Data Agent Kit 設定編輯器,新增您建立的環境設定參數:

  1. 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
  2. 展開「設定」,然後點選「設定」
  3. 選取「排程器」
  4. 輸入您先前建立的 Managed Airflow 第 3 代環境參數:
    • 專案 ID:環境所在的專案名稱。範例:example-project
    • 區域:環境所在的區域。例如:us-central1
    • 環境:環境名稱。例如:example-pipeline-scheduler
  5. 按一下 [儲存]

建立管道構件的 bucket

在與 Managed Airflow 環境相同的專案中建立 Cloud Storage bucket,並將其命名為類似 example-pipelines-bucket 的名稱。這個值區必須用於儲存 Managed Service for Apache Spark 工作。

部分管道動作,例如將結果輸出至 Cloud Storage bucket。

在 BigQuery 中建立新的資料集和資料表

本指南示範如何建立管道,將資料寫入 BigQuery 資料表。在專案中建立下列 BigQuery 資源:

  1. 建立名為 wordcount_dataset 的新資料集
  2. 建立名為 wordcount_output 的新 BigQuery 資料表

新增管道資產

本指南將示範如何使用 PySpark 執行常見的資料工程工作 (ETL:擷取、轉換、載入),從 BigQuery 讀取資料、轉換資料 (字數統計),然後將資料載入回 BigQuery。

非代理

將下列檔案新增至存放區的 /scripts 資料夾。稍後,您會在 Managed Service for Apache Spark 中新增管道動作,以執行這項指令碼。

範例 wordcount.py 檔案:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

更改下列內容:

  • ARTIFACTS_BUCKET_NAME:您稍早建立的 Cloud Storage bucket 名稱。範例:example-pipelines-bucket
  • PROJECT_ID:環境所在的專案名稱。範例:example-project

代理

提示 Agent 在存放區的 /scripts 資料夾中生成 PySpark 指令碼範例。稍後,您會在 Managed Service for Apache Spark 中新增執行這項指令碼的管道動作。

輸入類似下列的提示:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

在存放區中初始化自動化調度管理管道

初始化協調管道時,Antigravity IDE 的 Data Agent Kit 擴充功能會建立包含下列項目的支架:

  • 協調管道 YAML 檔案:包含排程但未定義動作的管道定義範例。
  • deployment.yaml:管道部署設定範例,定義管道的部署方式。這個檔案會說明 Managed Airflow 環境、構件值區,以及管道動作使用的任何其他資源,所需的設定。
  • .github/workflows/deploy.yaml:設定 GitHub 動作,在您將變更合併至 GitHub 存放區的 main 分支版本時,部署管道。
  • .github/workflows/validate.yaml:設定 GitHub 動作,在管道部署後驗證管道。

在本文後續步驟中,您將使用 Antigravity IDE 的 Data Agent Kit 擴充功能擴充這些定義,在本機建立及部署協調程序管道。

非代理

如要初始化自動化調度管理管道,請執行下列操作:

  1. 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
  2. 展開「資料工程」,然後按一下「Initialize orchestration pipeline」(初始化協調管道)
  3. 輸入新自動調度管道的參數:
  4. 管道 ID:輸入管道的 ID。範例:example-pipeline
  5. Google Cloud 專案 ID:環境所在的專案名稱。範例:example-project
  6. 區域:環境所在的區域。例如:us-central1
  7. 環境 ID:您要用來開發的環境名稱。 範例:dev/staging
  8. Scheduler Managed Service for Apache Airflow 環境:您要自動調度管理管道的環境名稱。對於這份文件,請在這個參數中指定相同環境。

  9. 構件 Bucket:用於管線構件的 Bucket 名稱,不含 gs:// 前置字元。範例:example-pipelines-bucket

  10. 點選「下一步」

  11. 按一下「初始化」

  12. 指定要初始化管道的工作區。

代理

要求 Agent 為存放區的協調管道建立架構。

輸入類似下列的提示:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

在存放區中初始化管道後,您就無法再次執行這項操作,因為新的支架會覆寫您所做的任何設定變更。如要新增 pipeline,請在專案中建立新的 pipeline 定義檔案,然後將這些檔案新增至部署設定。

將新工作新增至管道

由於初始管道設定沒有任何動作,因此您要新增一個動作,以便執行 PySpark 指令碼。

非代理

如要編輯管道,請按照下列步驟操作:

  1. 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
  2. 依序展開「資料工程」,然後展開「Orchestration Pipelines」
  3. 選取「example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。
  4. 選用:選取「Schedule trigger」(排程觸發條件) 節點。您可以指定類似 Cron 的運算式,以及排程的開始和結束時間,藉此調整管道的排程。新初始化的管道預設排程為 0 2 * * *,每天凌晨 2 點執行。
  1. 新增工作。在本指南中,您會新增 PySpark 工作,執行先前新增的 PySpark 指令碼:

    1. 按一下「新增第一個工作」,新增工作節點。
    2. 選取「執行 PySpark 指令碼」script/wordcount.py 檔案。

    「執行 PySpark 指令碼」面板隨即開啟。

    1. 在「Spark Cluster Mode」(Spark 叢集模式) 中,選取「Serverless Spark」(無伺服器 Spark)
    2. 在「位置」中,指定環境所在位置。 範例:us-central1
    3. 按一下 [儲存]

代理

執行下列提示:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

部署管道的本機版本

部署管道的本機版本,確認設定正確無誤。

部署自動化調度管理管道的本機版本時,Antigravity IDE 的 Data Agent Kit 擴充功能會將管道套件的本機版本上傳至 Managed Airflow 環境並執行。在本機部署時,請務必在開發環境中作業。

部署指令會部署未暫停的排程。為避免這種情況,您可以在「管道管理」窗格中手動暫停排程。您也可以編輯管道 YAML 檔案,註解或移除 triggers: - schedule 區塊。

非代理

如要部署範例協調管道的本機版本,請執行下列步驟:

  1. 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
  2. 依序展開「資料工程」和「Orchestration Pipelines」
  3. 選取「example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。
  4. 選取「執行管道」,然後選取先前建立的開發或暫存環境。

代理

執行下列提示:

Deploy my pipeline

監控管道執行作業並檢查執行記錄

管道部署完成後,您可以查看詳細資訊、管道執行記錄和管道執行記錄:

  1. 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
  2. 展開「資料工程」,然後選取「管道管理」
  3. 按一下管道名稱 (example-pipeline) 即可查看執行記錄。在特定日期的執行清單中,您可以查看個別管道執行作業,以及每個管道執行作業中個別動作的明細。
  4. 按一下工作 ID 即可查看工作執行記錄。由於範例 PySpark 指令碼是在 Managed Service for Apache Spark 中執行,因此工作記錄會提供批次記錄的連結。

排解及修正管道失敗問題

如果管道失敗,「管道管理」窗格中會顯示「診斷」按鈕。

代理

按一下「診斷」按鈕後,代理程式會產生提示,協助排解管道故障問題。提示會複製到剪貼簿,或在新即時通訊工作階段中開啟。

代理程式會運用專業技能排解管道問題,著重於收集記錄、交叉檢查已部署的程式碼和工作區,以及產生根本原因分析 (RCA)。

收到 RCA 後,可能的後續步驟如下:

  • 在目前工作區套用根本原因分析。
  • 請代理建立新分支,並在該分支套用變更。
  • 開啟 Cloud Customer Care 服務單,並附上 RCA 詳細資料。

如需擴充功能問題的疑難排解說明,請參閱「疑難排解」。

後續步驟