本指南介绍了如何在 Visual Studio Code 的 Google Cloud Data Agent Kit 扩展程序中创建和部署编排流水线。
示例流水线在 Managed Service for Apache Spark 中运行 PySpark 脚本。
您可以从 VS Code 将编排流水线部署为本地版本,也可以通过 GitHub 操作(例如在将更改合并到 main 分支时)进行部署。本文档演示了如何部署编排流水线的本地版本。
准备工作
在开始之前,请完成以下操作:
- 安装适用于 VS Code 的 Data Agent Kit 扩展程序。
- 配置设置。
- 将 GitHub 代码库添加到 VS Code 工作区,以存储 Orchestration Pipelines 和脚本等资源。
查看所需的 IAM 角色
如需获得在项目中创建资源、部署和运行 Orchestration Pipelines 的权限,请让管理员为您授予所需的角色。
如需创建和管理 Managed Service for Apache Airflow 环境并管理其关联存储分区中的对象,您需要以下角色。如需详细了解这些用户角色,请参阅Managed Service for Apache Airflow 文档中的 为用户授予角色。
- Environment and Storage Object Administrator (composer.environmentAndStorageObjectAdmin)
- Service Account User
(
iam.serviceAccountUser)
如需使用 BigQuery 和 Cloud Storage 资源,您需要以下角色。
- BigQuery Data Editor (
roles/bigquery.dataEditor) - Storage Object Admin (
roles/storage.objectAdmin)
根据您计划访问的资源,除了允许您使用扩展程序和使用编排流水线的角色之外,您可能还需要其他角色。
创建服务帐号并为其授予 IAM 角色
为 Managed Airflow Gen 3 环境使用唯一的服务帐号。 该服务帐号会创建 Managed Airflow Gen 3 环境并运行您部署的所有编排流水线。
请让管理员完成以下步骤:
- 按照 IAM 文档中的说明创建服务账号。
- 向服务账号授予 Composer Worker (
composer.worker) 角色。在大多数情况下,此角色提供所需的权限。
作为最佳实践,如果您需要访问项目中的其他资源,请仅在 Orchestration Pipelines 操作需要时才向此服务账号授予其他权限。Google Cloud
为编排流水线创建 Google Cloud 资源
在此步骤中,创建 Google Cloud 资源,用于编排 流水线。
创建 Managed Airflow Gen 3 环境
使用以下配置创建 Managed Airflow Gen 3 环境:
- 环境名称:输入您稍后将用于配置
编排流水线的名称。例如,
example-pipeline-scheduler。 - 位置:选择一个位置。我们建议您在本指南中在同一位置创建所有资源。例如,
us-central1。 - 服务账号:选择您为此 环境创建的服务帐号。
以下 Google Cloud CLI 命令示例演示了语法:
gcloud composer environments create example-pipeline-scheduler \
--location us-central1 \
--image-version composer-3-airflow-2 \
--service-account "example-account@example-project.iam.gserviceaccount.com"
向调度器配置添加环境参数
提供将执行编排流水线的 Managed Airflow 环境的连接详细信息。
使用 Google Cloud Data Agent Kit 设置编辑器添加您创建的环境的配置参数:
- 点击活动栏中的 Google Cloud Data Agent Kit 图标。
- 展开设置 ,然后点击设置 。
- 选择调度器 。
- 输入您之前创建的 Managed Airflow Gen 3 环境的参数:
- 项目 ID:环境所在项目的名称。
示例:
example-project。 - 区域:环境所在的区域。示例:
us-central1。 - 环境:环境的名称。示例:
example-pipeline-scheduler。
- 项目 ID:环境所在项目的名称。
示例:
- 点击保存 。
为流水线工件创建存储桶
在与
Managed Airflow 环境相同的项目中创建一个 Cloud Storage 存储桶,并为其指定类似于
example-pipelines-bucket的名称。此存储桶是存储 Managed Service for Apache Spark 作业所必需的。
某些流水线操作(例如将结果输出到 Cloud Storage 存储桶)。
在 BigQuery 中创建新的数据集和表
本指南演示了一个将数据写入 BigQuery 表的流水线。在项目中创建以下 BigQuery 资源:
- 创建一个新数据集,名为
wordcount_dataset。 - 创建一个新的 BigQuery 表,名为
wordcount_output。
添加流水线资源
本指南演示了使用 PySpark 执行的常见数据工程任务(ETL:提取、转换、加载),即从 BigQuery 读取数据,转换数据(字数统计),然后将其加载回 BigQuery。
非智能体
将以下文件添加到代码库的 /scripts 文件夹。您稍后会添加一个流水线操作,该操作会在 Managed Service for Apache Spark 中运行此脚本。
wordcount.py 文件示例:
#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)
# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')
# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()
# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()
print(f"Successfully wrote word counts to BigQuery table: {destination_table}")
替换以下内容:
- ARTIFACTS_BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称。示例:
example-pipelines-bucket。 - PROJECT_ID:环境
所在项目的名称。示例:
example-project。
智能体
提示智能体在代码库的 /scripts 文件夹中生成示例 PySpark 脚本。您稍后会添加一个流水线操作,该操作会在 Managed Service for Apache Spark 中运行此脚本。
输入类似于以下内容的提示:
I want to create a PySpark script that does the following:
1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.
My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.
Save the resulting script to /scripts as wordcount.py
在代码库中初始化编排流水线
初始化 Orchestration Pipelines 时,VS Code 的 Data Agent Kit 扩展程序会创建一个基架,其中包含以下内容:
- 编排流水线 YAML 文件:包含时间表但未定义操作的示例流水线定义。
deployment.yaml:示例流水线部署配置,用于定义流水线的部署方式。此文件演示了 Managed Airflow 环境、工件存储桶以及流水线操作使用的任何其他资源的所需配置。.github/workflows/deploy.yaml:设置 GitHub 操作,以便在您将更改合并到 GitHub 代码库的main分支时部署流水线。.github/workflows/validate.yaml:设置 GitHub 操作,以便在流水线部署后对其进行验证。
在本文档的后续步骤中,您将使用 VS Code 的 Data Agent Kit 扩展程序展开这些定义,以在本地创建和部署编排流水线。
非智能体
如需初始化编排流水线,请执行以下操作:
- 点击活动栏中的 Google Cloud Data Agent Kit 图标。
- 展开数据工程 ,然后点击初始化编排 流水线 。
- 输入新编排流水线的参数:
- 流水线 ID:输入流水线的 ID。示例:
example-pipeline。 - Google Cloud 项目 ID:环境
所在项目的名称。示例:
example-project。 - 区域:环境所在的区域。示例:
us-central1。 - 环境 ID:您要用于开发的环境的名称。
示例:
dev/staging。 调度器 Managed Service for Apache Airflow 环境: 您要在其中编排流水线的环境的名称。对于本文档,请在此参数中指定相同的环境。
工件存储桶:用于流水线工件的存储桶的名称, 不带
gs://前缀。示例:example-pipelines-bucket。点击下一步 。
点击初始化 。
指定您希望在其中初始化流水线的工作区。
智能体
让智能体为代码库的编排流水线创建脚手架。
输入类似于以下内容的提示:
Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.
The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.
The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.
Store pipeline artifacts in example-pipelines-bucket.
在代码库中初始化流水线后,您无法再次执行此操作,因为新的基架会覆盖您所做的任何配置更改。您可以在项目中创建新的流水线定义文件,并将其添加到部署配置中,以添加新的流水线。
向流水线添加新任务
由于初始流水线配置没有任何操作,因此您需要向其添加一个运行 PySpark 脚本的操作。
非智能体
如需修改流水线,请执行以下操作:
- 点击活动栏中的 Google Cloud Data Agent Kit 图标。
- 依次展开数据工程 和编排流水线 。
- 选择
example-pipeline.yaml。系统会为所选流水线打开流水线编辑器。 - 可选:选择时间表触发器 节点。您可以通过指定类似 Cron 的表达式以及时间表的开始时间和结束时间来调整流水线的时间表。新初始化的流水线的默认时间表为
0 2 * * *,每天凌晨 2 点运行。
添加新任务。在本指南中,您将添加一个 PySpark 任务,该任务运行您之前添加的 PySpark 脚本:
- 点击添加第一个任务 以添加新任务节点。
- 选择执行 PySpark 脚本 和
script/wordcount.py文件。
系统会打开执行 PySpark 脚本 面板。
- 在 Spark 集群模式下,选择 Serverless Spark 。
- 在位置中,指定环境所在的位置。
示例:
us-central1。 - 点击保存 。
智能体
运行以下提示:
Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.
部署流水线的本地版本
部署流水线的本地版本,以确认其配置正确。
部署编排流水线的本地版本时,VS Code 的 Data Agent Kit 扩展程序会将流水线软件包的本地版本上传到 Managed Airflow 环境并运行它。本地部署旨在用于在开发环境中工作时。
部署命令会部署未暂停的时间表。为防止这种情况,您可以在“流水线管理”窗格中手动暂停时间表。您还可以修改流水线 YAML 文件,以注释掉或移除 triggers: - schedule 块。
非智能体
如需部署示例编排流水线的本地版本,请执行以下操作:
- 点击活动栏中的 Google Cloud Data Agent Kit 图标。
- 依次展开数据工程 和Orchestration Pipelines 。
- 选择
example-pipeline.yaml。系统会为所选流水线打开流水线编辑器。 - 选择运行流水线 ,然后选择您之前创建的开发或暂存环境。
智能体
运行以下提示:
Deploy my pipeline
监控流水线执行情况并检查执行日志
部署流水线后,您可以查看其详细信息、流水线运行历史记录和流水线执行日志:
- 点击活动栏中的 Google Cloud Data Agent Kit 图标。
- 展开数据工程 ,然后选择流水线管理 。
- 点击流水线的名称 (
example-pipeline) 以查看其执行历史记录。在特定日期的运行列表中,您可以查看各个流水线运行以及每个流水线运行中的各个操作的明细。 - 点击任务 ID 以查看任务执行日志。由于示例 PySpark 脚本是在 Managed Service for Apache Spark 中执行的,因此任务日志将包含指向批处理日志的链接。
排查和修复流水线失败问题
当流水线失败时,您会在流水线管理 窗格中看到诊断 按钮。
智能体
点击诊断 按钮后,智能体会生成一个提示,用于排查流水线失败问题。该提示会复制到剪贴板或在新聊天会话中打开。
智能体使用专业技能来排查流水线问题,重点是收集日志、交叉检查已部署的代码和工作区,以及生成根本原因分析 (RCA)。
收到 RCA 后,可能的后续步骤如下:
- 在当前工作区中应用根本原因分析。
- 让智能体创建一个新分支并在其中应用更改。
- 使用 RCA 详细信息打开 Cloud Customer Care 工单。
如需获得有关排查扩展程序问题的帮助,请参阅 问题排查。