构建数据工程流水线

本指南介绍了如何在 Visual Studio Code 的 Google Cloud Data Agent Kit 扩展程序中创建和部署编排流水线。

此流水线示例在 Managed Service for Apache Spark 中运行 PySpark 脚本。

您可以从 VS Code 部署编排流水线,以本地版本或通过 GitHub 操作(例如将更改合并到 main 分支时)进行部署。本文档演示了如何部署编排流水线的本地版本。

准备工作

在开始之前,请完成以下步骤:

  1. 安装适用于 VS Code 的 Data Agent Kit 扩展程序
  2. 配置设置
  3. 将 GitHub 代码库添加到 VS Code 工作区,以存储 Orchestration Pipelines 和脚本等资源。

查看所需的 IAM 角色

如需获得在项目中创建资源、部署和运行 Orchestration Pipelines 的权限,请让管理员向您授予所需角色。

如需创建和管理 Managed Service for Apache Airflow 环境并管理其关联存储分区中的对象,您需要以下角色。如需详细了解这些用户角色,请参阅 Managed Service for Apache Airflow 文档中的向用户授予角色

  • Environment and Storage Object Administrator (composer.environmentAndStorageObjectAdmin)
  • Service Account User (iam.serviceAccountUser)

如需使用 BigQuery 和 Cloud Storage 资源,您需要具备以下角色。

  • BigQuery Data Editor (roles/bigquery.dataEditor)
  • Storage Object Admin (roles/storage.objectAdmin)

根据您计划访问的资源,除了允许您使用扩展程序和处理编排流水线的角色之外,您可能还需要其他角色。

创建服务账号并向其授予 IAM 角色

为 Managed Airflow Gen 3 环境使用唯一的服务账号。该服务账号用于创建托管式 Airflow Gen 3 环境,并运行您部署的所有编排流水线。

请让管理员完成以下步骤:

  1. 按照 IAM 文档中的说明创建服务账号
  2. 向服务账号授予 Composer Worker (composer.worker) 角色。在大多数情况下,此角色可提供所需的权限。

最佳实践:如果您需要访问Google Cloud 项目中的其他资源,请仅在编排流水线操作需要时才向此服务账号授予额外权限。

为编排流水线创建 Google Cloud 资源

在此步骤中,为编排流水线创建 Google Cloud 资源。

创建托管式 Airflow Gen 3 环境

创建具有以下配置的 Managed Airflow Gen 3 环境

  • 环境名称:输入一个名称,您稍后将使用该名称来配置编排流水线。例如 example-pipeline-scheduler
  • 位置:选择一个位置。建议您在本指南中在同一位置创建所有资源。例如 us-central1
  • 服务账号:选择您为此环境创建的服务账号。

以下 Google Cloud CLI 命令示例展示了相应语法:

gcloud composer environments create example-pipeline-scheduler \
  --location us-central1 \
  --image-version composer-3-airflow-2 \
  --service-account "example-account@example-project.iam.gserviceaccount.com"

向调度程序配置添加了环境参数

提供将执行编排流水线的 Managed Airflow 环境的连接详细信息。

使用 Google Cloud Data Agent Kit 设置编辑器添加您创建的环境的配置参数:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开设置,然后点击设置
  3. 选择调度程序
  4. 输入您之前创建的 Managed Airflow 第 3 代环境的参数:
    • 项目 ID:环境所在项目的名称。 示例:example-project
    • 区域:环境所在的区域。示例:us-central1
    • 环境:环境的名称。示例:example-pipeline-scheduler
  5. 点击保存

为流水线工件创建存储桶

在与 Managed Airflow 环境相同的项目中创建一个 Cloud Storage 存储桶,并为其指定一个类似于 example-pipelines-bucket 的名称。此存储桶是存储 Managed Service for Apache Spark 作业所必需的。

某些流水线操作,例如将结果输出到 Cloud Storage 存储桶。

在 BigQuery 中创建新的数据集和表

本指南演示了将数据写入 BigQuery 表的流水线。在项目中创建以下 BigQuery 资源:

  1. 创建名为 wordcount_dataset 的新数据集
  2. 创建名为 wordcount_output 的新 BigQuery 表

添加流水线资产

本指南演示了如何使用 PySpark 执行常见的数据工程任务 (ETL:提取、转换、加载),即从 BigQuery 读取数据、转换数据(字数统计)并将其加载回 BigQuery。

非智能体

将以下文件添加到代码库的 /scripts 文件夹中。您稍后会添加一个流水线操作,用于在 Managed Service for Apache Spark 中运行此脚本。

wordcount.py 文件示例:

#!/usr/bin/python
"""BigQuery I/O PySpark example for Word Count"""

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName('spark-bigquery-demo') \
.getOrCreate()

# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the connector.
bucket = ARTIFACTS_BUCKET_NAME
spark.conf.set('temporaryGcsBucket', bucket)

# Load data from BigQuery public dataset (Shakespeare).
words = spark.read.format('bigquery') \
.option('table', 'bigquery-public-data:samples.shakespeare') \
.load()
words.createOrReplaceTempView('words')

# Perform word count using Spark SQL.
# This query counts occurrences of each word.
word_count = spark.sql(
    'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC'
)
word_count.show()
word_count.printSchema()

# Saving the results to a new table in BigQuery.
# Replace YOUR_PROJECT_ID with your project ID.
destination_table = 'PROJECT_ID:wordcount_dataset.wordcount_output'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.mode('overwrite') \
.save()

print(f"Successfully wrote word counts to BigQuery table: {destination_table}")

替换以下内容:

  • ARTIFACTS_BUCKET_NAME:您之前创建的 Cloud Storage 存储桶的名称。示例:example-pipelines-bucket
  • PROJECT_ID:环境所在的项目的名称。示例:example-project

智能体

提示智能体在代码库的 /scripts 文件夹中生成一个 PySpark 脚本示例。您稍后会添加一个流水线操作,用于在 Managed Service for Apache Spark 中运行此脚本。

输入类似于以下内容的提示:

I want to create a PySpark script that does the following:

1. Loads data from the bigquery-public-data:samples.shakespeare.
2. Counts occurrences of each word across all works using a Spark SQL query.
Sum the existing word counts for each word to get the total occurrences.
I want the results to be ordered by the word popularity, most popular first.
3. Saves results to a new table in BigQuery, in my project.

My project is sample-project, the destination table is
wordcount_dataset.wordcount_output, and I want to store temporary BigQuery
export data in example-pipelines-bucket.

Save the resulting script to /scripts as wordcount.py

在代码库中初始化编排流水线

初始化 Orchestration Pipelines 时,VS Code 的 Data Agent Kit 扩展程序会创建一个包含以下内容的基架:

  • 编排流水线 YAML 文件:包含时间表的流水线定义示例,但未定义任何操作。
  • deployment.yaml:一个示例流水线部署配置,用于定义流水线的部署方式。此文件演示了 Managed Airflow 环境、制品桶以及流水线操作使用的任何其他资源所需的配置。
  • .github/workflows/deploy.yaml:设置 GitHub 操作,以便在您将更改合并到 GitHub 代码库的 main 分支时部署流水线。
  • .github/workflows/validate.yaml:设置在流水线部署后对其进行验证的 GitHub 操作。

在本文档的后续步骤中,您将使用 VS Code 的 Data Agent Kit 扩展程序来扩展这些定义,以便在本地创建和部署编排流水线。

非智能体

如需初始化编排流水线,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开 Data Engineering,然后点击 Initialize orchestration pipeline
  3. 输入新编排流水线的参数:
  4. 流水线 ID:输入流水线的 ID。示例:example-pipeline
  5. Google Cloud 项目 ID:环境所在项目的名称。示例:example-project
  6. 区域:环境所在的区域。示例:us-central1
  7. 环境 ID:您要用于开发的环境的名称。 示例:dev/staging
  8. 调度程序 Managed Service for Apache Airflow 环境:您要编排流水线的环境的名称。对于此文档,请在此参数中指定相同的环境。

  9. 制品存储桶:用于流水线制品的存储桶的名称,不含 gs:// 前缀。示例:example-pipelines-bucket

  10. 点击下一步

  11. 点击初始化

  12. 指定您希望初始化流水线的工作区。

智能体

让代理为您的代码库创建编排流水线的框架。

输入类似于以下内容的提示:

Initialize orchestration pipelines in my repository. Don't add any actions
or schedule yet. I want to do it later.

The pipeline is my-sample-pipeline, the project ID is my-project, and the
region is us-central1.

The environment ID is my-test-environment. Use the same environment ID for
the Scheduler Managed Service.

Store pipeline artifacts in example-pipelines-bucket.

在代码库中初始化流水线后,您无法再次执行此操作,因为新的脚手架会覆盖您所做的任何配置更改。您可以在项目中创建新的流水线定义文件,并将其添加到部署配置中,从而添加新的流水线。

向流水线添加新任务

由于初始流水线配置没有任何操作,因此您需要向其中添加一个用于运行 PySpark 脚本的操作。

非智能体

如需修改流水线,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开数据工程,然后展开编排流水线
  3. 选择 example-pipeline.yaml。系统会打开所选流水线的流水线编辑器。
  4. 可选:选择安排触发器节点。您可以指定类似 Cron 的表达式以及时间表的开始和结束时间,从而调整流水线的时间表。新初始化的流水线的默认时间表为 0 2 * * *,每天凌晨 2 点运行。
  1. 添加新任务。在本指南中,您将添加一个 PySpark 任务,该任务会运行您之前添加的 PySpark 脚本:

    1. 点击添加第一个任务以添加新的任务节点。
    2. 选择 Execute PySpark scriptscript/wordcount.py 文件。

    系统会打开执行 PySpark 脚本面板。

    1. 在“Spark 集群模式”中,选择无服务器 Spark
    2. 位置中,指定环境所在的位置。 示例:us-central1
    3. 点击保存

智能体

运行以下提示:

Add the wordcount.py script to the pipeline. I want to run it in Serverless
Spark every day at 1 AM. Run it in the same region where the environment that
runs my pipeline is located. Use the minimal resource profile.

部署流水线的本地版本

部署流水线的本地版本,以确认其配置正确。

部署本地版本的编排流水线时,VS Code 的 Data Agent Kit 扩展程序会将本地版本的流水线软件包上传到 Managed Airflow 环境并运行该软件包。本地部署旨在用于开发环境。

部署命令用于部署未暂停的时间表。为防止这种情况,您可以在“流水线管理”窗格中手动暂停时间表。您还可以修改流水线 YAML 文件,以注释掉或移除 triggers: - schedule 块。

非智能体

如需部署示例编排流水线的本地版本,请执行以下操作:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开数据工程,然后展开Orchestration Pipelines
  3. 选择 example-pipeline.yaml。系统会打开所选流水线的流水线编辑器。
  4. 选择 Run pipeline,然后选择您之前创建的开发或预发布环境。

智能体

运行以下提示:

Deploy my pipeline

监控流水线执行情况并检查执行日志

流水线部署完毕后,您可以查看其详细信息、流水线运行历史记录和流水线执行日志:

  1. 点击活动栏中的 Google Cloud Data Agent Kit 图标。
  2. 展开数据工程,然后选择流水线管理
  3. 点击流水线的名称 (example-pipeline) 以查看其执行历史记录。在特定日期的运行列表中,您可以查看各个流水线运行以及每次流水线运行中各个操作的细分情况。
  4. 点击任务 ID 即可查看任务执行日志。由于示例 PySpark 脚本是在 Managed Service for Apache Spark 中执行的,因此任务日志将包含指向批量日志的链接。

排查和修复流水线失败问题

当流水线失败时,您会在流水线管理窗格中看到诊断按钮。

智能体

点击诊断按钮后,代理会生成提示,以排查流水线失败问题。提示会复制到剪贴板或在新聊天会话中打开。

该代理使用专业技能来排查流水线问题,重点在于收集日志、交叉检查已部署的代码和工作区,以及生成根本原因分析 (RCA)。

收到 RCA 后,您可以采取的后续步骤如下:

  • 在当前工作区中应用根本原因分析。
  • 让代理创建新分支,并在其中应用更改。
  • 创建包含 RCA 详细信息的 Cloud Customer Care 工单。

如需有关排查扩展程序问题的帮助,请参阅排查 VS Code 的 Data Agent Kit 扩展程序问题

后续步骤