本指南說明如何使用 Visual Studio Code 適用的 Google Cloud Data Agent Kit 擴充功能,建立及部署協調管道。
範例管道會在 Managed Service for Apache Spark 中執行 PySpark 指令碼。
您可以從 VS Code 部署協調流程管道,做為本機版本或透過 GitHub 動作 (例如將變更合併至 main 分支版本時)。本文說明如何部署編排管道的本機版本。
事前準備
開始之前,請先完成下列操作:
- 安裝 VS Code 的 Data Agent Kit 擴充功能。
- 調整設定。
- 將 GitHub 存放區新增至 VS Code 工作區,以便儲存協調流程管道和資產 (例如指令碼)。
查看必要的 IAM 角色
如要取得在專案中建立資源、部署及執行 Orchestration Pipelines 的權限,請要求系統管理員授予您必要角色。
如要建立及管理 Managed Service for Apache Airflow 環境,並管理相關聯儲存空間中的物件,您需要下列角色。如要進一步瞭解這些使用者角色,請參閱 Managed Service for Apache Airflow 說明文件中的「授予使用者角色」。
- 環境與 Storage 物件管理員 (composer.environmentAndStorageObjectAdmin)
- 服務帳戶使用者
(
iam.serviceAccountUser)
如要使用 BigQuery 和 Cloud Storage 資源,您需要下列角色。
- BigQuery 資料編輯者 (
roles/bigquery.dataEditor) - Storage 物件管理員 (
roles/storage.objectAdmin)
視您打算存取的資源而定,您可能需要額外角色,才能使用擴充功能及處理協調流程管道。
建立服務帳戶並授予 IAM 角色
為 Managed Airflow 第 3 代環境使用專屬服務帳戶。服務帳戶會建立 Managed Airflow 第 3 代環境,並執行您部署的所有自動化調度管理管道。
請管理員完成下列步驟:
- 按照 IAM 說明文件中的說明建立服務帳戶。
- 將 Composer Worker (
composer.worker) 角色授予服務帳戶。這個角色在大多數情況下都會提供必要權限。
最佳做法是,如果需要存取Google Cloud 專案中的其他資源,請僅在編排管道作業需要時,才授予這項服務帳戶額外權限。
為自動化調度管理管道建立 Google Cloud 資源
在這個步驟中,請為協調管道建立 Google Cloud 資源。
建立代管 Airflow 第 3 代環境
建立 Managed Airflow 第 3 代環境,並使用下列設定:
- 環境名稱:輸入稍後用於設定協調管道的名稱。例如:
example-pipeline-scheduler。 - 位置:選取位置。建議您在本指南中建立的所有資源都位於相同位置。例如:
us-central1。 - 服務帳戶:選取您為這個環境建立的服務帳戶。
以下 Google Cloud CLI 指令範例會示範語法:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
在排程器設定中新增環境參數
請提供 Managed Airflow 環境的連線詳細資料,該環境將執行協調流程管道。
使用 Google Cloud Data Agent Kit 設定編輯器,新增您建立的環境設定參數:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「設定」,然後點選「設定」。
- 選取「排程器」。
- 輸入您先前建立的 Managed Airflow 第 3 代環境參數:
- 專案 ID:環境所在的專案名稱。範例:
example-project。 - 區域:環境所在的區域。例如:
us-central1。 - 環境:環境名稱。例如:
example-pipeline-scheduler。
- 專案 ID:環境所在的專案名稱。範例:
- 按一下 [儲存]。
建立管道構件的 bucket
在與 Managed Airflow 環境相同的專案中建立 Cloud Storage bucket,並將其命名為類似 example-pipelines-bucket 的名稱。這個值區必須用於儲存 Managed Service for Apache Spark 工作。
部分管道動作,例如將結果輸出至 Cloud Storage bucket。
在 BigQuery 中建立新的資料集和資料表
本指南示範如何建立管道,將資料寫入 BigQuery 資料表。在專案中建立下列 BigQuery 資源:
新增管道資產
本指南將示範如何使用 PySpark 執行常見的資料工程工作 (ETL:擷取、轉換、載入),從 BigQuery 讀取資料、轉換資料 (字數統計),然後將資料載入回 BigQuery。
非代理
將下列檔案新增至存放區的 /scripts 資料夾。稍後,您會在 Managed Service for Apache Spark 中新增執行這項指令碼的管道動作。
範例 wordcount.py 檔案:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
更改下列內容:
- ARTIFACTS_BUCKET_NAME:您稍早建立的 Cloud Storage bucket 名稱。範例:
example-pipelines-bucket。 - PROJECT_ID:環境所在的專案名稱。範例:
example-project。
代理功能
提示 Agent 在存放區的 /scripts 資料夾中生成 PySpark 指令碼範例。稍後,您會在 Managed Service for Apache Spark 中新增執行這項指令碼的管道動作。
輸入類似下列的提示:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
在存放區中初始化自動化調度管理管道
初始化協調管道時,VS Code 的 Data Agent Kit 擴充功能會建立包含下列項目的架構:
- 協調管道 YAML 檔案:包含排程但未定義動作的管道定義範例。
deployment.yaml:管道部署設定範例,定義管道的部署方式。這個檔案會說明 Managed Airflow 環境、構件值區,以及管道動作使用的任何其他資源,所需的設定。.github/workflows/deploy.yaml:設定 GitHub 動作,在您將變更合併至 GitHub 存放區的main分支版本時,部署管道。.github/workflows/validate.yaml:設定 GitHub 動作,在管道部署後驗證管道。
在本文件的後續步驟中,您會使用 VS Code 的 Data Agent Kit 擴充功能擴充這些定義,以便在本地建立及部署協調管道。
非代理
如要初始化自動化調度管理管道,請執行下列操作:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「資料工程」,然後按一下「Initialize orchestration pipeline」(初始化協調管道)。
- 輸入新協調管道的參數:
- 管道 ID:輸入管道的 ID。範例:
example-pipeline。 - Google Cloud 專案 ID:環境所在的專案名稱。範例:
example-project。 - 區域:環境所在的區域。例如:
us-central1。 - 環境 ID:您要用來開發的環境名稱。
範例:
dev/staging。 Scheduler Managed Service for Apache Airflow Environment:您要自動化調度管理管道的環境名稱。對於這份文件,請在這個參數中指定相同環境。
構件 Bucket:用於管線構件的 Bucket 名稱,不含
gs://前置字元。範例:example-pipelines-bucket。點選「下一步」。
按一下「初始化」。
指定要初始化管道的工作區。
代理功能
請 Agent 為存放區的協調管道建立架構。
輸入類似下列的提示:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
在存放區中初始化管道後,您就無法再次執行這項操作,因為新的支架會覆寫您所做的任何設定變更。如要新增 pipeline,請在專案中建立新的 pipeline 定義檔案,然後將這些檔案新增至部署設定。
將新工作新增至管道
由於初始管道設定沒有任何動作,因此您要新增一個動作,以便執行 PySpark 指令碼。
非代理
如要編輯管道,請按照下列步驟操作:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 依序展開「資料工程」,然後展開「Orchestration Pipelines」。
- 選取「
example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。 - 選用:選取「Schedule trigger」(排程觸發條件) 節點。您可以指定類似 Cron 的運算式,以及排程的開始和結束時間,藉此調整管道的排程。新初始化的管道預設排程為
0 2 * * *,每天凌晨 2 點執行。
新增工作。在本指南中,您會新增 PySpark 工作,執行先前新增的 PySpark 指令碼:
- 按一下「新增第一個工作」,新增工作節點。
- 選取「執行 PySpark 指令碼」和
script/wordcount.py檔案。
「執行 PySpark 指令碼」面板隨即開啟。
- 在「Spark Cluster Mode」(Spark 叢集模式) 中,選取「Serverless Spark」(無伺服器 Spark)。
- 在「位置」中,指定環境所在位置。
範例:
us-central1。 - 按一下 [儲存]。
代理功能
執行下列提示:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
部署管道的本機版本
部署管道的本機版本,確認設定正確無誤。
部署自動化調度管理管道的本機版本時,VS Code 的 Data Agent Kit 擴充功能會將管道套件的本機版本上傳至 Managed Airflow 環境並執行。在本機部署時,請務必在開發環境中作業。
部署指令會部署未暫停的排程。為避免這種情況,您可以在「管道管理」窗格中手動暫停排程。您也可以編輯管道 YAML 檔案,註解或移除 triggers: - schedule 區塊。
非代理
如要部署範例協調管道的本機版本,請執行下列步驟:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 依序展開「資料工程」和「Orchestration Pipelines」。
- 選取「
example-pipeline.yaml」。系統會開啟所選管道的管道編輯器。 - 選取「執行管道」,然後選取先前建立的開發或暫存環境。
代理功能
執行下列提示:
Deploy my pipeline
監控管道執行作業並檢查執行記錄
管道部署完成後,您可以查看詳細資訊、管道執行記錄和管道執行記錄:
- 按一下活動列中的「Google Cloud Data Agent Kit」圖示。
- 展開「資料工程」,然後選取「管道管理」。
- 按一下管道名稱 (
example-pipeline) 即可查看執行記錄。在特定日期的執行清單中,您可以查看個別管道執行作業,以及每個管道執行作業中個別動作的明細。 - 按一下工作 ID 即可查看工作執行記錄。由於範例 PySpark 指令碼是在 Managed Service for Apache Spark 中執行,因此工作記錄會提供批次記錄的連結。
排解及修正管道失敗問題
如果管道失敗,「管道管理」窗格中會顯示「診斷」按鈕。
代理
按一下「診斷」按鈕後,代理程式會產生提示,協助排解管道故障問題。提示會複製到剪貼簿,或在新即時通訊工作階段中開啟。
代理程式會運用專業技能排解管道問題,著重於收集記錄、交叉檢查已部署的程式碼和工作區,以及產生根本原因分析 (RCA)。
收到 RCA 後,可能的後續步驟如下:
- 在目前工作區套用根本原因分析。
- 請代理建立新分支,並在該分支套用變更。
- 開啟 Cloud Customer Care 服務單,並附上 RCA 詳細資料。
如需擴充功能問題的疑難排解說明,請參閱「排解 VS Code 適用的 Data Agent Kit 擴充功能問題」。