使用 Python 创建 Dataflow 流水线
本文档介绍如何使用 Python 版 Apache Beam SDK 构建用于定义流水线的程序。然后,您可以使用直接本地运行程序或云端运行程序(如 Dataflow)来运行流水线。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.supportUser, roles/datastream.admin, roles/monitoring.metricsScopesViewer, roles/cloudaicompanion.settingsAdmingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID替换为您的项目 ID。 - 将
PROJECT_NUMBER替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe命令。 - 将
SERVICE_ACCOUNT_ROLE替换为每个角色。
-
创建一个 Cloud Storage 存储桶并按如下所示进行配置:
-
将存储类别设置为
S(标准)。 -
将存储位置设置为以下项:
US(美国)。 -
将
BUCKET_NAME替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
将存储类别设置为
- 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。您将在本文档的后面部分用到这些值。
设置环境
在本部分中,您将使用命令提示符,通过 venv 设置独立的 Python 虚拟环境来运行流水线项目。借助此过程,您可以将一个项目的依赖项与其他项目的依赖项隔离开来。
如果您没有便捷易用的命令提示符,可以使用 Cloud Shell。Cloud Shell 已经安装了适用于 Python 3 的软件包管理系统,因此您可以跳过创建虚拟环境的过程。
如需安装 Python,然后创建虚拟环境,请按照以下步骤操作:
- 检查系统中是否已运行 Python 3 和
pip:python --version python -m pip --version
- 如有必要,请安装 Python 3,然后设置 Python 虚拟环境:按照设置 Python 开发环境页面的安装 Python 和设置 venv部分中提供的说明操作。
完成快速入门后,您可以运行 deactivate 来停用虚拟环境。
获取 Apache Beam SDK
Apache Beam SDK 是一个用于数据流水线的开源编程模型。您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。
如需下载并安装 Apache Beam SDK,请按照以下步骤操作:
- 验证您在上一部分中创建的 Python 虚拟环境中。确保提示符以
<env_name>开头,其中env_name是虚拟环境的名称。 - 安装 Python 版 Apache Beam SDK 的最新版本:
pip install apache-beam[gcp]
在本地运行流水线
如需查看流水线如何在本地运行,请使用 wordcount 示例的现成 Python 模块,该模块随 apache_beam 软件包提供。
wordcount 流水线示例会执行以下操作:
接收一个文本文件作为输入。
此文本文件位于 Cloud Storage 存储桶中,其资源名称为
gs://dataflow-samples/shakespeare/kinglear.txt。- 将每一行解析为字词。
- 对标记化字词进行词频计数。
如需在本地暂存 wordcount 流水线,请按照以下步骤操作:
- 从本地终端运行
wordcount示例:python -m apache_beam.examples.wordcount \ --output outputs
- 查看该流水线的输出:
more outputs* - 如需退出,请按 q。
wordcount.py 源代码。在 Dataflow 服务上运行流水线
在本部分中,将从 Dataflow 服务上的apache_beam 软件包运行 wordcount 示例流水线。此示例指定 DataflowRunner 作为 --runner 的参数。- 运行流水线:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
替换以下内容:
DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域,例如europe-west1--region标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。BUCKET_NAME:您之前复制的 Cloud Storage 存储桶名称PROJECT_ID:您之前复制的 Google Cloud 项目 ID
查看结果
使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。在本部分中,使用 Google Cloud 控制台或本地终端验证流水线是否正在运行。
Google Cloud 控制台
如需在 Google Cloud 控制台中查看结果,请按照以下步骤操作:
本地终端
通过终端或使用 Cloud Shell 查看结果。
- 如需列出输出文件,请使用
gcloud storage ls命令:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- 如需查看输出文件中的结果,请使用
gcloud storage cat命令:gcloud storage cat gs://BUCKET_NAME/results/outputs*
将 BUCKET_NAME 替换为流水线程序中使用的 Cloud Storage 存储桶的名称。
修改流水线代码
上述示例中的wordcount 流水线区分大写和小写字词。以下步骤演示了如何修改流水线,以使 wordcount 流水线不区分大小写。- 在本地机器上,从 Apache Beam GitHub 代码库下载
wordcount代码的最新副本。 - 从本地终端运行流水线:
python wordcount.py --output outputs
- 查看结果。
more outputs* - 如需退出,请按 q。
- 在您选择的编辑器中,打开
wordcount.py文件。 - 在
run函数中,检查流水线步骤:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
split后面的几行被拆分为字符串形式的字词。 - 如需对字符串进行小写处理,请修改
split后面的行: 此修改会将counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower函数映射到每个字词上。这一行相当于beam.Map(lambda word: str.lower(word))。 - 保存该文件并运行修改后的
wordcount作业:python wordcount.py --output outputs
- 查看修改后的流水线的结果:
more outputs* - 如需退出,请按 q。
- 在 Dataflow 服务上运行修改后的流水线:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
替换以下内容:
DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域BUCKET_NAME:您的 Cloud Storage 存储桶名称PROJECT_ID:您的 Google Cloud 项目 ID
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
- 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击删除,然后按照说明操作。
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke