构建数据工程流水线

本指南介绍了如何在 Antigravity IDE 的 Google Cloud Data Agent Kit 扩展程序中创建和部署编排流水线。

示例流水线在 Managed Service for Apache Spark 中运行 PySpark 脚本。

您可以从 Antigravity IDE 将编排流水线部署为本地版本,也可以通过 GitHub 操作(例如在将更改合并到 main 分支时)进行部署。本文档演示了如何部署编排流水线的本地版本。

准备工作

在开始之前,请完成以下操作:

  1. 为 Antigravity IDE 安装 Data Agent Kit 扩展程序
  2. 配置设置
  3. 将 GitHub 代码库添加到 Antigravity IDE 工作区,以存储 Orchestration Pipelines 和脚本等资产。

查看所需的 IAM 角色

如需获得在项目中创建资源、部署和运行 Orchestration Pipelines 的权限,请让管理员为您授予所需的角色。

如需创建和管理 Managed Service for Apache Airflow 环境并管理其关联存储分区中的对象,您需要以下角色。如需详细了解这些用户角色,请参阅Managed Service for Apache Airflow 文档中的 为用户授予角色

  • Environment and Storage Object Administrator (composer.environmentAndStorageObjectAdmin)
  • Service Account User (iam.serviceAccountUser)

如需使用 BigQuery 和 Cloud Storage 资源,您需要以下角色。

  • BigQuery Data Editor (roles/bigquery.dataEditor)
  • Storage Object Admin (roles/storage.objectAdmin)

根据您计划访问的资源,除了允许您使用扩展程序和使用编排流水线的角色之外,您可能还需要其他角色。

创建服务帐号并为其授予 IAM 角色

为 Managed Airflow Gen 3 环境使用唯一的服务帐号。 该服务帐号会创建 Managed Airflow Gen 3 环境并运行您部署的所有编排流水线。

请让管理员完成以下步骤:

  1. 按照 IAM 文档中的说明创建服务账号。
  2. 向服务账号授予 Composer Worker (composer.worker) 角色。在大多数情况下,此角色提供所需的权限。

作为最佳实践,如果您需要访问项目中的其他资源,请仅在 Orchestration Pipelines 操作需要时才向此服务账号授予其他权限。Google Cloud

为编排流水线创建 Google Cloud 资源

在此步骤中,创建 Google Cloud 资源,用于编排 流水线。

创建 Managed Airflow Gen 3 环境

使用以下配置创建 Managed Airflow Gen 3 环境:

  • 环境名称:输入您稍后将用于配置 编排流水线的名称。例如,example-pipeline-scheduler
  • 位置:选择一个位置。我们建议您在本指南中创建的所有资源都位于同一位置。例如,us-central1
  • 服务账号:选择您为此 环境创建的服务帐号。

以下 Google Cloud CLI 命令示例演示了语法:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

向调度器配置添加环境参数

提供将执行编排流水线的 Managed Airflow 环境的连接详细信息。

使用 Google Cloud Data Agent Kit 设置编辑器添加您创建的环境的配置参数:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开设置 ,然后点击设置
  3. 选择调度器
  4. 输入您之前创建的 Managed Airflow Gen 3 环境的参数:
    • 项目 ID:环境所在项目的名称。 示例:example-project
    • 区域:环境所在的区域。示例:us-central1
    • 环境:环境的名称。示例:example-pipeline-scheduler
  5. 点击保存

为流水线工件创建存储桶

在与 Managed Airflow 环境相同的项目中创建一个 Cloud Storage 存储桶,并为其指定类似于 example-pipelines-bucket的名称。此存储桶是存储 Managed Service for Apache Spark 作业所必需的。

某些流水线操作(例如将结果输出到 Cloud Storage 存储桶)。

在 BigQuery 中创建新的数据集和表

本指南演示了将数据写入 BigQuery 表的流水线。在项目中创建以下 BigQuery 资源:

  1. 创建一个新数据集,名为 wordcount_dataset
  2. 创建一个新的 BigQuery 表,名为 wordcount_output

添加流水线资产

本指南演示了使用 PySpark 执行的常见数据工程任务(ETL:提取、转换、加载),即从 BigQuery 读取数据、转换数据(字数统计)并将其加载回 BigQuery。

非智能体

将以下文件添加到代码库的 /scripts 文件夹。您稍后会添加一个流水线操作,该操作会在 Managed Service for Apache Spark 中运行此脚本。

wordcount.py 文件示例:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

替换以下内容:

  • ARTIFACTS_BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称。示例:example-pipelines-bucket
  • PROJECT_ID:环境 所在项目的名称。示例:example-project

智能体

提示智能体在代码库的 /scripts 文件夹中生成示例 PySpark 脚本。您稍后会添加一个流水线操作,该操作会在 Managed Service for Apache Spark 中运行此脚本。

输入类似于以下内容的提示:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

在代码库中初始化编排流水线

初始化编排流水线时,Antigravity IDE 的 Data Agent Kit 扩展程序会创建一个脚手架,其中包含以下内容:

  • 编排流水线 YAML 文件:包含时间表但未定义操作的示例流水线定义。
  • deployment.yaml:示例流水线部署配置,用于定义流水线的部署方式。此文件演示了 Managed Airflow 环境、工件存储桶以及流水线操作使用的任何其他资源的所需配置。
  • .github/workflows/deploy.yaml:设置 GitHub 操作,以便在您将更改合并到 GitHub 代码库的 main 分支时部署流水线。
  • .github/workflows/validate.yaml:设置 GitHub 操作,以便在流水线部署后对其进行验证。

在本文档的后续步骤中,您将使用 Antigravity IDE 的 Data Agent Kit 扩展程序展开这些定义,以在本地创建和部署编排流水线。

非智能体

如需初始化编排流水线,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开数据工程 ,然后点击初始化编排 流水线
  3. 输入新编排流水线的参数:
  4. 流水线 ID:输入流水线的 ID。示例:example-pipeline
  5. Google Cloud 项目 ID:环境 所在项目的名称。示例:example-project
  6. 区域:环境所在的区域。示例:us-central1
  7. 环境 ID:您要用于开发的环境的名称。 示例:dev/staging
  8. Scheduler Managed Service for Apache Airflow 环境: 您要在其中编排流水线的环境的名称。对于本文档,请在此参数中指定相同的环境。

  9. 工件存储桶:用于流水线工件的存储桶的名称, 不带 gs:// 前缀。示例:example-pipelines-bucket

  10. 点击下一步

  11. 点击初始化

  12. 指定您希望在其中初始化流水线的工作区。

智能体

让智能体为代码库的编排流水线创建脚手架。

输入类似于以下内容的提示:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

在代码库中初始化流水线后,您无法再次执行此操作,因为新的基架会覆盖您所做的任何配置更改。您可以在项目中创建新的流水线定义文件并将其添加到部署配置中,以添加新的流水线。

向流水线添加新任务

由于初始流水线配置没有任何操作,因此您需要向其添加一个运行 PySpark 脚本的操作。

非智能体

如需修改流水线,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 依次展开数据工程编排流水线
  3. 选择 example-pipeline.yaml。系统会为所选流水线打开流水线编辑器。
  4. 可选:选择时间表触发器 节点。您可以通过指定类似 Cron 的表达式以及时间表开始和结束时间来调整流水线的时间表。新初始化的流水线的默认时间表为 0 2 * * *,每天凌晨 2 点运行。
  1. 添加新任务。在本指南中,您将添加一个 PySpark 任务,该任务运行您之前添加的 PySpark 脚本:

    1. 点击添加第一个任务 以添加新任务节点。
    2. 选择执行 PySpark 脚本script/wordcount.py 文件。

    系统会打开执行 PySpark 脚本 面板。

    1. 在 Spark 集群模式下,选择 Serverless Spark
    2. 位置中,指定环境所在的位置。 示例:us-central1
    3. 点击保存

智能体

运行以下提示:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

部署流水线的本地版本

部署流水线的本地版本,以确认其配置正确。

部署编排流水线的本地版本时,Antigravity IDE 的 Data Agent Kit 扩展程序会将流水线软件包的本地版本上传到 Managed Airflow 环境并运行该版本。本地部署旨在用于在开发环境中工作。

deploy 命令会部署未暂停的时间表。为避免这种情况,您可以在“流水线管理”窗格中手动暂停时间表。您还可以修改流水线 YAML 文件,以注释掉或移除 triggers: - schedule 代码块。

非智能体

如需部署示例编排流水线的本地版本,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 依次展开数据工程Orchestration Pipelines
  3. 选择 example-pipeline.yaml。系统会为所选流水线打开流水线编辑器。
  4. 选择运行流水线 ,然后选择您之前创建的开发或暂存环境。

智能体

运行以下提示:

Deploy my pipeline

监控流水线执行情况并检查执行日志

部署流水线后,您可以查看其详细信息、流水线运行作业的历史记录以及流水线执行日志:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开数据工程 ,然后选择流水线管理
  3. 点击流水线的名称 (example-pipeline) 以查看其执行历史记录。在特定日期的运行列表中,您可以查看各个流水线运行以及每个流水线运行中各个操作的明细。
  4. 点击任务 ID 以查看任务执行日志。由于示例 PySpark 脚本是在 Managed Service for Apache Spark 中执行的,因此任务日志将包含指向批处理日志的链接。

排查和修复流水线失败问题

当流水线失败时,您会在流水线管理 窗格中看到诊断 按钮。

智能体

点击诊断 按钮后,智能体会生成一个提示来排查流水线失败问题。该提示会复制到剪贴板或在新聊天会话中打开。

智能体使用专门的技能来排查流水线问题,重点是收集日志、交叉检查已部署的代码和工作区,以及生成根本原因分析 (RCA)。

收到 RCA 后,可能的后续步骤如下:

  • 在当前工作区中应用根本原因分析。
  • 让智能体创建一个新分支并在其中应用更改。
  • 使用 RCA 详细信息打开 Cloud Customer Care 工单。

如需获得有关排查扩展程序问题的帮助,请参阅 问题排查

后续步骤