使用 Dataflow 模板创建流处理流水线

本快速入门介绍如何使用 Google 提供的 Dataflow 模板创建流处理流水线。具体来说,本快速入门会以 Pub/Sub to BigQuery 模板为例。

Pub/Sub to BigQuery 模板是一种流处理流水线,可从 Pub/Sub 主题读取 JSON 格式的消息并将其写入 BigQuery 表中。


如需在 Google Cloud 控制台中直接遵循有关此任务的分步指导,请点击操作演示

操作演示


准备工作

在运行流水线之前,请完成以下步骤。

设置项目

  1. 登录您的 Google Cloud 账号。如果您是新手 Google Cloud, 请创建一个账号来评估我们的产品在 实际场景中的表现。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

所需的角色

如需完成本快速入门,您需要拥有以下 Identity and Access Management (IAM) 角色。

如需获得完成本快速入门所需的权限,请让您的管理员为您授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义 角色或其他预定义 角色来获取所需的权限。

为确保 Compute Engine 默认服务帐号具有运行 Dataflow 作业所需的 权限, 请让您的管理员为 Compute Engine 默认服务帐号授予项目的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您的管理员也可以通过自定义角色或其他预定义角色为 Compute Engine 默认服务账号授予所需的权限。

创建 Cloud Storage 存储桶

您必须先创建一个 Cloud Storage 存储桶,然后才能运行流水线。

  1. 创建 Cloud Storage 存储桶:

    1. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。

      进入“存储分区”

    2. 点击 创建
    3. 创建存储桶 页面上,输入您的存储桶信息。要转到下一步 ,请点击继续
      1. 指定存储桶的名称中,输入唯一的存储桶名称。请勿在存储桶名称中添加敏感 信息,因为存储桶命名空间是全局性的,公开 可见。
      2. 选择存储数据的位置 部分中,执行以下操作:
        1. 选择位置类型
        2. 位置类型下拉菜单中选择一个位置,用于永久存储存储桶的数据。
        3. 如需设置 跨存储桶复制,请选择 通过 Storage Transfer Service 添加跨存储桶复制 ,然后 按照以下步骤操作:

          设置跨存储桶复制

          1. 存储桶 菜单中,选择一个存储桶。
          2. 复制设置 部分中, 点击配置 以配置 复制作业的设置。

            系统会显示配置跨存储桶复制 窗格 显示。

            • 如需按对象名称前缀过滤要复制的对象, 请输入要用于包含或排除对象的前缀,然后点击 添加前缀
            • 如需为复制的对象设置存储类别, 请从存储类别菜单中选择一个存储类别。 如果您跳过此步骤,则复制的对象会默认使用 目标存储桶的存储类别。
            • 点击完成
      3. 选择存储数据的位置 部分中,执行以下操作:
        1. 设置默认类别 部分中,选择以下内容: 标准
        2. 如需启用 分层命名空间,请在 针对数据密集型工作负载优化存储 部分中,选择 在此存储桶上启用分层命名空间
      4. 选择如何控制对对象的访问权限 部分中,选择 存储桶是否强制执行禁止公开访问, 然后为存储桶对象选择访问权限控制方法
      5. 选择如何保护对象数据 部分中,执行以下操作:
        • 数据保护 下,选择您要为存储桶设置的任何选项。
          • 如需启用 软删除,请点击 软删除政策(用于数据恢复) 复选框, 然后指定您希望在删除对象后保留对象的天数。
          • 如需设置 对象版本控制,请点击 对象版本控制(用于版本控制) 复选框, 然后指定每个对象的最大版本数以及非当前版本过期的天数。
          • 如需在对象和存储分区上启用保留政策,请点击保留(用于合规性) 复选框,然后执行以下操作:
            • 如需启用 对象保留锁定,请点击 启用对象保留 复选框。
            • 如需启用 存储桶锁定,请点击 设置存储桶保留政策 复选框,然后为保留期限选择时间单位和时间长度。
        • 如需选择对象数据的加密方式,请展开 数据加密 部分 (),然后选择 数据加密 方法
    4. 点击创建
  2. 在后续部分中根据需要复制以下内容:

    • 您的 Cloud Storage 存储桶名称。
    • 您的 Google Cloud 项目 ID。

    如需查找此 ID,请参阅识别项目

VPC 网络

默认情况下,每个新项目起初都有一个默认网络。如果您的项目的默认网络已停用或者已被删除,则您需要在自己的用户账号具备 Compute Network User 角色 (roles/compute.networkUser) 的项目中拥有网络。

创建 BigQuery 数据集和表

使用 Google Cloud 控制台为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。

在此示例中,数据集的名称为 taxirides,表的名称为 realtime。如需创建此数据集和表,请按照以下步骤操作:

  1. 转到 BigQuery 页面。
    前往 BigQuery
  2. 探索器面板中,点击您要创建数据集的项目旁边的 查看操作,然后点击创建数据集
  3. 创建数据集面板上,按照以下步骤操作:
    1. 数据集 ID 部分,输入 taxirides。 数据集 ID 对于每个 Google Cloud 项目是唯一的。
    2. 对于位置类型,请选择多区域,然后选择美国(美国的多个区域)。公共数据集存储在 US 多区域位置。为简单起见,请将数据集放在同一位置。
    3. 保留其他默认设置,然后点击创建数据集
  4. 探索器面板中,展开您的项目。
  5. taxirides 数据集旁边,点击 查看操作,然后点击创建表
  6. 创建表面板上,按照以下步骤操作:
    1. 来源部分,为基于以下数据创建表选择空表
    2. 目标位置部分的中,输入 realtime
    3. 架构部分,点击以文字形式修改开关,并将以下架构定义粘贴到相应的框中:
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. 分区和集群设置部分中,对于分区,选择时间戳字段。
  7. 保留其他默认设置,然后点击创建表

运行流水线

使用 Google 提供的 Pub/Sub to BigQuery 模板运行流处理流水线。流水线会从该输入主题获取传入数据。

  1. 转到 Dataflow 作业页面。
    转到作业
  2. 点击从模板创建作业
  3. 输入 taxi-data 作为 Dataflow 作业的名称
  4. 对于 Dataflow 模板,选择 Pub/Sub to BigQuery 模板。
  5. 对于 BigQuery 输出表,输入以下内容:
    PROJECT_ID:taxirides.realtime

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。

  6. Optional source parameters 部分的 Input Pub/Sub topic 中,点击手动输入主题
  7. 在对话框中,为主题名称输入以下内容,然后点击保存
    projects/pubsub-public-data/topics/taxirides-realtime

    此公开提供的 Pub/Sub 主题基于纽约市出租车和豪华轿车委员会的开放数据集。以下是来自此主题的 JSON 格式的示例消息:

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. 临时位置部分,输入以下内容:
    gs://BUCKET_NAME/temp/

    BUCKET_NAME 替换为您的 Cloud Storage 存储桶的名称。temp 文件夹用于存储临时文件,如暂存的流水线作业。

  9. 如果您的项目没有默认网络,请输入网络子网。如需了解详情,请参阅指定网络和子网
  10. 点击运行作业

查看结果

如需查看写入 realtime 表的数据,请按照以下步骤操作:

  1. 转到 BigQuery 页面。

    前往 BigQuery

  2. 点击 编写新查询。随即会打开一个新的编辑器标签页。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID 替换为创建了 BigQuery 数据集的项目的 ID。数据最长可能需要五分钟才会开始显示在表中。

  3. 点击运行

    查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。

清理

为避免因本页中使用的资源导致您的 Google Cloud 账号产生费用,请按照以下步骤操作。

删除项目

为了避免产生费用,最简单的方法是删除您为本快速入门创建的 Google Cloud 项目。

  1. 在 Google Cloud 控制台中,前往 管理资源 页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击 关闭以删除项目。

逐个删除资源

如果您希望保留本快速入门中使用的 Google Cloud 项目,请单独删除各个资源:

  1. 转到 Dataflow 作业页面。
    转到作业
  2. 从作业列表中选择您的流处理作业。
  3. 在导航中,点击停止
  4. 停止作业对话框中,取消排空您的流水线,然后点击停止作业
  5. 转到 BigQuery 页面。
    前往 BigQuery
  6. 探索器面板中,展开您的项目。
  7. 在要删除的数据集旁边,点击 查看操作,然后点击打开
  8. 在详细信息面板中,点击删除数据集,然后按照相应说明操作。
  9. 在 Google Cloud 控制台中,前往 Cloud Storage 存储分区 页面。

    进入“存储分区”

  10. 点击要删除的存储分区对应的复选框。
  11. 如需删除存储桶, 请点击 删除,然后按照 说明操作。

后续步骤