使用 Python 创建 Dataflow 流水线

本文档介绍如何使用 Python 版 Apache Beam SDK 构建用于定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。


如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示

操作演示


准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。

  3. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  4. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  5. 创建或选择 Google Cloud 项目

    选择或创建项目所需的角色

    • 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
    • 创建项目:如需创建项目,您需要拥有 Project Creator 角色 (roles/resourcemanager.projectCreator),该角色包含 resourcemanager.projects.create 权限。了解如何授予角色
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目名称。

  6. 验证是否已为您的 Google Cloud 项目启用结算功能

  7. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予角色

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. 为您的用户账号创建本地身份验证凭证:

    gcloud auth application-default login

    如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI

  9. 向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com
    • ROLE:您授予用户账号的 IAM 角色。
  10. 安装 Google Cloud CLI。

  11. 如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI

  12. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  13. 创建或选择 Google Cloud 项目

    选择或创建项目所需的角色

    • 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
    • 创建项目:如需创建项目,您需要拥有 Project Creator 角色 (roles/resourcemanager.projectCreator),该角色包含 resourcemanager.projects.create 权限。了解如何授予角色
    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目名称。

  14. 验证是否已为您的 Google Cloud 项目启用结算功能

  15. 启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:

    启用 API 所需的角色

    如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (roles/serviceusage.serviceUsageAdmin),该角色包含 serviceusage.services.enable 权限。了解如何授予角色

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. 为您的用户账号创建本地身份验证凭证:

    gcloud auth application-default login

    如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI

  17. 向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次: roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com
    • ROLE:您授予用户账号的 IAM 角色。
  18. 向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替换为您的项目 ID。
    • PROJECT_NUMBER 替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用 gcloud projects describe 命令。
    • SERVICE_ACCOUNT_ROLE 替换为每个角色。
  19. 创建一个 Cloud Storage 存储桶并按如下所示进行配置:
    • 将存储类别设置为 S(标准)。
    • 将存储位置设置为以下项: US(美国)。
    • BUCKET_NAME 替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。
    gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
  20. 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。

设置环境

在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。

如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理系统,因此您可以跳过创建虚拟环境的过程。

如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:

  1. 检查系统中是否已运行 Python 3 和 pip
    python --version
    python -m pip --version
  2. 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。

完成快速入门后,您可以运行 deactivate 来停用虚拟环境。

获取 Apache Beam SDK

Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。

如需下载并安装 Apache Beam SDK,请按照以下步骤操作:

  1. 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以 <env_name> 开头,其中 env_name 是虚拟环境的名称。
  2. 安装 Python 版 Apache Beam SDK 的最新版本:
  3. pip install apache-beam[gcp]

在本地运行流水线

如需查看流水线如何在本地运行,请使用 wordcount 示例的现成 Python 模块,该模块随 apache_beam 软件包提供。

wordcount 流水线示例会执行以下操作:

  1. 接收一个文本文件作为输入。

    此文本文件位于 Cloud Storage 存储桶中,其资源名称为 gs://dataflow-samples/shakespeare/kinglear.txt

  2. 将每一行解析为字词。
  3. 对标记化字词进行词频计数。

如需在本地暂存 wordcount 流水线,请按照以下步骤操作:

  1. 从本地终端运行 wordcount 示例:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. 查看该流水线的输出:
    more outputs*
  3. 如需退出,请按 q
通过在本地运行流水线,您可以测试和调试 Apache Beam 程序。您可以在 Apache Beam GitHub 上查看 wordcount.py 源代码。

在 Dataflow 服务上运行流水线

在本部分中,将从 Dataflow 服务上的 apache_beam 软件包运行 wordcount 示例流水线。此示例指定 DataflowRunner 作为 --runner 的参数。
  • 运行流水线:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    替换以下内容:

    • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域,例如 europe-west1

      --region 标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。

    • BUCKET_NAME:您之前复制的 Cloud Storage 存储桶名称
    • PROJECT_ID:您之前复制的 Google Cloud 项目 ID

查看结果

使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Google Cloud 控制台或本地终端验证流水线是否正在运行。

Google Cloud 控制台

如需在 Google Cloud 控制台中查看结果,请按照以下步骤操作:

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业页面。

    转到作业

    作业页面会显示 wordcount 作业的详细信息,包括状态最初为正在运行,然后变为成功

  2. 进入 Cloud Storage 存储桶页面。

    前往存储桶

  3. 在项目的存储桶列表中,点击您之前创建的存储桶。

    wordcount 目录中,系统会显示已创建的作业的输出文件。

本地终端

通过终端或使用 Cloud Shell 查看结果。

  1. 如需列出输出文件,请使用 gcloud storage ls 命令
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. BUCKET_NAME 替换为流水线程序中使用的 Cloud Storage 存储桶的名称。

  3. 如需查看输出文件中的结果,请使用 gcloud storage cat 命令
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

修改流水线代码

上述示例中的 wordcount 流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount 流水线不区分大小写。
  1. 在本地机器上,从 Apache Beam GitHub 代码库下载 wordcount 代码的最新副本。
  2. 从本地终端运行流水线:
    python wordcount.py --output outputs
  3. 查看结果。
    more outputs*
  4. 如需退出,请按 q
  5. 在您选择的编辑器中,打开 wordcount.py 文件。
  6. run 函数中,检查流水线步骤:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split 后面的几行被拆分为字符串形式的字词。

  7. 如需对字符串进行小写处理,请修改 split 后面的行:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    此修改会将 str.lower 函数映射到每个字词上。这一行相当于 beam.Map(lambda word: str.lower(word))
  8. 保存该文件并运行修改后的 wordcount 作业:
    python wordcount.py --output outputs
  9. 查看修改后的流水线的结果:
    more outputs*
  10. 如需退出,请按 q
  11. 在 Dataflow 服务上运行修改后的流水线:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    替换以下内容:

    • DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域
    • BUCKET_NAME:您的 Cloud Storage 存储桶名称
    • PROJECT_ID:您的 Google Cloud 项目 ID

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。

    进入“存储桶”

  2. 点击要删除的存储分区对应的复选框。
  3. 如需删除存储分区,请点击删除,然后按照说明操作。
  4. 如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  6. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤