使用 Dataflow 模板创建流处理流水线
本快速入门介绍如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub to BigQuery 模板为例。
Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表中。
如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示:
准备工作
在运行流水线之前,请完成以下步骤。
设置项目
- 登录您的 Google Cloud 账号。如果您是新手 Google Cloud, 请创建一个账号来评估我们的产品在 实际场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
所需的角色
如需完成本快速入门,您需要拥有以下 Identity and Access Management (IAM) 角色。
如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:
-
BigQuery User (
roles/bigquery.user) -
Dataflow Admin (
roles/dataflow.admin) -
Service Account User (
roles/iam.serviceAccountUser) -
Storage Admin (
roles/storage.admin)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
您也可以通过自定义 角色或其他预定义 角色来获取所需的权限。
为确保 Compute Engine 默认服务帐号具有运行 Dataflow 作业所需的 权限, 请让您的管理员为 Compute Engine 默认服务帐号授予项目的以下 IAM 角色:
如果未能向正确的主账号授予这些角色,可能会导致权限错误。-
BigQuery Data Editor (
roles/bigquery.dataEditor) -
Dataflow Worker (
roles/dataflow.worker) -
Pub/Sub Editor (
roles/pubsub.editor) -
Storage Object Admin (
roles/storage.objectAdmin) -
Viewer (
roles/viewer)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
您的管理员也可以通过自定义角色或其他预定义角色为 Compute Engine 默认服务账号授予所需的权限。
创建 Cloud Storage 存储桶
您必须先创建一个 Cloud Storage 存储桶,然后才能运行流水线。
创建 Cloud Storage 存储桶:
- 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。
- 点击 创建。
- 在创建存储桶 页面上,输入您的存储桶信息。要转到下一步
,请点击继续。
- 在指定存储桶的名称中,输入唯一的存储桶名称。请勿在存储桶名称中添加敏感 信息,因为存储桶命名空间是全局性的,公开 可见。
-
在选择存储数据的位置 部分中,执行以下操作:
- 选择位置类型。
- 从位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
- 如需设置 跨存储桶复制,请选择
通过 Storage Transfer Service 添加跨存储桶复制 ,然后
按照以下步骤操作:
设置跨存储桶复制
- 在存储桶 菜单中,选择一个存储桶。
在复制设置 部分中, 点击配置 以配置 复制作业的设置。
系统会显示配置跨存储桶复制 窗格 显示。
- 如需按对象名称前缀过滤要复制的对象, 请输入要用于包含或排除对象的前缀,然后点击 添加前缀。
- 如需为复制的对象设置存储类别, 请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用 目标存储桶的存储类别。
- 点击完成 。
-
在选择存储数据的位置 部分中,执行以下操作:
- 在设置默认类别 部分中,选择以下内容: 标准。
- 如需启用 分层命名空间,请在 针对数据密集型工作负载优化存储 部分中,选择 在此存储桶上启用分层命名空间。
- 在选择如何控制对对象的访问权限 部分中,选择 存储桶是否强制执行禁止公开访问, 然后为存储桶对象选择访问权限控制方法。
-
在选择如何保护对象数据 部分中,执行以下操作:
- 在数据保护 下,选择您要为存储桶设置的任何选项。
- 如需选择对象数据的加密方式,请展开 数据加密 部分 (),然后选择 数据加密 方法。
- 点击创建 。
在后续部分中根据需要复制以下内容:
- 您的 Cloud Storage 存储桶名称。
- 您的 Google Cloud 项目 ID。
如需查找此 ID,请参阅识别项目。
VPC 网络
默认情况下,每个新项目起初都有一个默认网络。如果您的项目的默认网络已停用或者已被删除,则您需要在自己的用户账号具备 Compute Network User 角色 (roles/compute.networkUser) 的项目中拥有网络。
创建 BigQuery 数据集和表
使用 Google Cloud 控制台为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。
在此示例中,数据集的名称为 taxirides,表的名称为 realtime。如需创建此数据集和表,请按照以下步骤操作:
- 转到 BigQuery 页面。
前往 BigQuery - 在探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击创建数据集。
- 在创建数据集面板上,按照以下步骤操作:
- 在数据集 ID 部分,输入
taxirides。 数据集 ID 对于每个 Google Cloud 项目是唯一的。 - 对于位置类型,请选择多区域,然后选择美国(美国的多个区域)。公共数据集存储在
US多区域位置。为简单起见,请将数据集放在同一位置。 - 保留其他默认设置,然后点击创建数据集
- 在
探索器 面板中,展开您的项目。 - 在
taxirides数据集旁边,点击 查看操作,然后点击创建表。 - 在创建表面板上,按照以下步骤操作:
- 在来源部分,为基于以下数据创建表选择空表。
- 在目标位置部分的表中,输入
realtime。 - 在架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中:
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 在分区和集群设置部分中,对于分区,选择时间戳字段。
- 保留其他默认设置,然后点击创建表。
运行流水线
使用 Google 提供的 Pub/Sub to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。
- 转到 Dataflow 作业页面。
转到作业 - 点击
从模板创建作业 。 - 输入
taxi-data作为 Dataflow 作业的名称。 - 对于 Dataflow 模板,选择 Pub/Sub to BigQuery 模板。
- 对于 BigQuery 输出表,输入以下内容:
PROJECT_ID:taxirides.realtime
将
PROJECT_ID替换为创建了 BigQuery 数据集的项目的 ID。 - 在 Optional source parameters 部分的 Input Pub/Sub topic 中,点击手动输入主题。
- 在对话框中,为主题名称输入以下内容,然后点击保存:
projects/pubsub-public-data/topics/taxirides-realtime
此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- 在临时位置部分,输入以下内容:
gs://BUCKET_NAME/temp/
将
BUCKET_NAME替换为您的 Cloud Storage 存储桶的名称。temp文件夹用于存储临时文件,如暂存的流水线作业。 - 如果您的项目没有默认网络,请输入网络和子网。如需了解详情,请参阅指定网络和子网。
- 点击运行作业。
查看结果
如需查看写入realtime 表的数据,请按照以下步骤操作:
转到 BigQuery 页面。
点击 编写新查询。随即会打开一个新的编辑器标签页。
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
将
PROJECT_ID替换为创建了 BigQuery 数据集的项目的 ID。数据最长可能需要五分钟才会开始显示在表中。点击运行。
查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。
清理
为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。
删除项目
为了避免产生费用,最简单的方法是删除您为本快速入门创建的 Google Cloud 项目。- 在 Google Cloud 控制台中,前往 管理资源 页面。
- 在项目列表中,选择要删除的项目,然后点击删除。
- 在对话框中输入项目 ID,然后点击 关闭以删除项目。
逐个删除资源
如果您希望保留本快速入门中使用的 Google Cloud 项目,请单独删除各个资源:
- 转到 Dataflow 作业页面。
转到作业 - 从作业列表中选择您的流处理作业。
- 在导航中,点击停止。
- 在停止作业对话框中,取消或排空您的流水线,然后点击停止作业。
- 转到 BigQuery 页面。
前往 BigQuery - 在探索器面板中,展开您的项目。
- 在要删除的数据集旁边,点击 查看操作,然后点击打开。
- 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
- 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。
- 点击要删除的存储分区对应的复选框。
- 如需删除存储桶, 请点击 删除,然后按照 说明操作。