使用 Java 创建 Dataflow 流水线
本文档介绍了如何设置 Google Cloud 项目、创建使用 Java 版 Apache Beam SDK 构建的示例流水线,以及在 Dataflow 服务上运行示例流水线。该流水线会从 Cloud Storage 读取文本文件,计算该文件中不重复字词的数量,然后将字数统计写回 Cloud Storage。如需了解 WordCount 流水线,请观看如何在 Apache Beam 中使用 WordCount 视频。
本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
启用 Dataflow、Compute Engine、Cloud Logging、Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Datastore 和 Cloud Resource Manager API:
启用 API 所需的角色
如需启用 API,您需要拥有 Service Usage Admin IAM 角色 (
roles/serviceusage.serviceUsageAdmin),该角色包含serviceusage.services.enable权限。了解如何授予角色。gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/iam.serviceAccountUsergcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- 将
PROJECT_ID替换为您的项目 ID。 - 将
PROJECT_NUMBER替换为您的项目编号。 如需查找项目编号,请参阅识别项目或使用gcloud projects describe命令。 - 将
SERVICE_ACCOUNT_ROLE替换为每个角色。
-
创建一个 Cloud Storage 存储桶并按如下所示进行配置:
-
将存储类别设置为
S(标准)。 -
将存储位置设置为以下项:
US(美国)。 -
将
BUCKET_NAME替换为 唯一的存储桶名称。请勿在存储桶名称中添加敏感信息,因为存储桶命名空间是全局性的,公开可见。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
将存储类别设置为
- 在后续部分中根据需要复制以下内容:
- 您的 Cloud Storage 存储桶名称。
- 您的 Google Cloud 项目 ID。如需查找此 ID,请参阅识别项目。
- 下载并安装 Java Development Kit (JDK) 版本 11。(Dataflow 继续支持版本 8。)验证
JAVA_HOME环境变量已设置并指向您的 JDK 安装。 - 下载 Apache Maven,并按照适用于您的具体操作系统的 Maven 安装指南进行安装。
获取流水线代码
Apache Beam SDK 是一个用于数据处理流水线的开源编程模型。您可使用 Apache Beam 程序定义这些流水线,还可以选择 Dataflow 等运行程序来运行流水线。
- 在您的 shell 或终端中,使用 Maven Archetype 插件在包含 Apache Beam SDK 的
WordCount示例的计算机上创建 Maven 项目:mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.71.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false该命令会在当前目录下创建一个名为
word-count-beam的新目录。word-count-beam目录包含一个简单的pom.xml文件和一系列计算文本文件字数的示例流水线。 - 验证
word-count-beam目录包含pom.xml文件:Linux 或 macOS
cd word-count-beam/ ls
输出如下所示:
pom.xml src
Windows
cd word-count-beam/ dir
输出如下所示:
pom.xml src
- 验证您的 Maven 项目包含示例流水线:
Linux 或 macOS
ls src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
输出如下所示:
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
如需查看这些示例中使用的 Apache Beam 概念的详细介绍,请参阅 Apache Beam WordCount 示例。后面部分中的说明使用 WordCount.java。
在本地运行流水线
- 在您的 shell 或终端中,从
word-count-beam目录在本地运行WordCount流水线:mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"输出文件的前缀为
counts并写入word-count-beam目录。它们包含输入文本中的不重复单词以及每个单词的出现次数。
在 Dataflow 服务上运行流水线
- 在您的 shell 或终端中,从
word-count-beam目录在 Dataflow 服务上构建并运行WordCount流水线:mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"替换以下内容:
PROJECT_ID:您的 Google Cloud 项目 IDBUCKET_NAME:Cloud Storage 存储桶的名称REGION:Dataflow 区域,例如us-central1
查看结果
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
删除项目
为了避免产生费用,最简单的方法是删除您为本快速入门创建的 Google Cloud 项目。
- 在 Google Cloud 控制台中,前往管理资源页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击关闭以删除项目。
逐个删除资源
如果您希望保留本快速入门中使用的 Google Cloud 项目,请单独删除各个资源:
- 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储分区,请点击删除,然后按照说明操作。
撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.adminroles/dataflow.workerroles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke
后续步骤
- 了解 Apache Beam 编程模型。
- 了解如何使用 Apache Beam 构建流水线。
- 完成 WordCount 和移动游戏示例。