使用 Go 创建 Dataflow 流水线
本页面介绍如何使用 Go 版 Apache Beam SDK 构建用于定义流水线的程序。然后,您将在本地和 Dataflow 服务上运行该流水线。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID替换为您的项目 ID。 - 将
PROJECT_NUMBER替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe命令。 - 将
SERVICE_ACCOUNT_ROLE替换为每个角色。
-
创建一个 Cloud Storage 存储桶并按如下所示进行配置:
-
将存储类别设置为
S(标准)。 -
将存储位置设置为以下项:
US(美国)。 -
将
BUCKET_NAME替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
将存储类别设置为
- 复制 Google Cloud 项目 ID 和 Cloud Storage 存储桶名称。 您将在本快速入门的后面部分用到这些值。
设置开发环境
Apache Beam SDK 是一个用于数据流水线的开源编程模型。 您可以使用 Apache Beam 程序定义流水线,然后选择 Dataflow 等运行程序来运行流水线。
我们建议您在使用 Go 版 Apache Beam SDK 时使用最新版本的 Go。如果您未安装最新版本的 Go,请使用 Go 的下载和安装指南下载并安装适用于您的特定操作系统的 Go。
如需验证您已安装的 Go 的版本,请在本地终端中运行以下命令:
go version运行 Beam 字数统计示例
Go 版 Apache Beam SDK 包含一个 wordcount 流水线示例。wordcount 示例执行以下操作:
- 读取一个文本文件作为输入。默认情况下,它会读取 Cloud Storage 存储桶中资源名称为
gs://dataflow-samples/shakespeare/kinglear.txt的文本文件。 - 将每一行解析为字词。
- 对标记化字词进行词频计数。
如需在本地机器上运行最新版本的 Beam wordcount 示例,请执行以下步骤:
使用
git clone命令克隆apache/beamGitHub 代码库:git clone https://github.com/apache/beam.git切换到
beam/sdks/go目录:cd beam/sdks/go使用以下命令运行流水线:
go run examples/wordcount/wordcount.go \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputsinput标志指定要读取的文件,output标志指定频率计数输出的文件名。
流水线完成后,查看输出结果:
more outputs*如需退出,请按 q。
修改流水线代码
Beam wordcount 流水线区分大写和小写字词。以下步骤展示了如何创建您自己的 Go 模块,修改 wordcount 流水线以使其不区分大小写,以及在 Dataflow 上运行。
创建 Go 模块
如需更改流水线代码,请按照以下步骤操作。
在您选择的位置为您的 Go 模块创建一个目录:
mkdir wordcountcd wordcount创建 Go 模块。在本示例中,请使用
example/dataflow作为模块路径。go mod init example/dataflow从 Apache Beam GitHub 代码库下载
wordcount代码的最新副本。将此文件放入您创建的wordcount目录中。如果您使用的是非 Linux 操作系统,则必须获取 Go
unix软件包。在 Dataflow 服务上运行流水线时需要此软件包。go get -u golang.org/x/sys/unix确保
go.mod文件与模块的源代码匹配:go mod tidy
运行未经修改的流水线
验证未修改的 wordcount 流水线在本地运行。
从终端在本地构建和运行流水线:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs查看输出结果:
more outputs*如需退出,请按 q。
更改流水线代码
如需更改流水线以使其不区分大小写,请修改代码以将 strings.ToLower 函数应用于所有字词。
在您选择的编辑器中,打开
wordcount.go文件。检查
init块(为清楚起见,已移除注释):func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() }添加一个新行以注册
strings.ToLower函数:func init() { register.DoFn3x0[context.Context, string, func(string)](&extractFn{}) register.Function2x1(formatFn) register.Emitter1[string]() register.Function1x1(strings.ToLower) }检查
CountWords函数:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) }要将字词小写,请添加 ParDo,以将
strings.ToLower应用于每个字词:func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Map all letters to lowercase. lowercaseWords := beam.ParDo(s, strings.ToLower, col) // Count the number of times each word occurs. return stats.Count(s, lowercaseWords) }保存文件。
在本地运行更新后的流水线
在本地运行更新后的 wordcount 流水线,并验证输出是否已更改。
构建并运行修改后的
wordcount流水线:go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs查看修改后的流水线的结果:所有字词都应小写。
more outputs*如需退出,请按 q。
在 Dataflow 服务上运行流水线
如需在 Dataflow 服务上运行更新后的 wordcount 示例,请使用以下命令:
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/替换以下内容:
BUCKET_NAME:Cloud Storage 存储桶名称。PROJECT_ID: Google Cloud 项目 ID。DATAFLOW_REGION:要在其中部署 Dataflow 作业的区域。例如europe-west1。 如需查看可用位置的列表,请参阅 Dataflow 位置。--region标志会替换元数据服务器、本地客户端或环境变量中设置的默认区域。
查看结果
您可以在Google Cloud 控制台中查看 Dataflow 作业列表。在 Google Cloud 控制台中,前往 Dataflow 作业页面。
作业页面会显示 wordcount 作业的详细信息,包括状态最初为正在运行,然后变为成功。
使用 Dataflow 运行流水线时,您的结果存储在 Cloud Storage 存储桶中。使用Google Cloud 控制台或本地终端查看输出结果。
控制台
如需在 Google Cloud 控制台中查看结果,请前往 Cloud Storage 存储桶页面。
在项目的存储桶列表中,点击您之前创建的存储桶。您的作业创建的输出文件显示在 results 目录中。
终端
通过终端或使用 Cloud Shell 查看结果。
如需列出输出文件,请使用
gcloud storage ls命令:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long将
BUCKET_NAME替换为指定输出 Cloud Storage 存储桶的名称。如需查看输出文件中的结果,请使用
gcloud storage cat命令:gcloud storage cat gs://BUCKET_NAME/results/outputs*
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
- 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击删除,然后按照说明操作。
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke