使用可选的 Cloud Run 函数触发器将变更流式传输到 Pub/Sub

本教程介绍了如何使用 Bigtable 变更数据流流到 Pub/Sub 模板,包括如何设置主题并配置模板。您可以视需要使用所选的编程语言创建由事件流触发的 Cloud Run 函数。

本教程适用于熟悉 Bigtable、编写代码和事件流服务的技术用户。

目标

本教程介绍了如何执行以下操作:

  • 创建启用了变更数据流的 Bigtable 表。
  • 创建一个具有 Bigtable 变更数据流架构的 Pub/Sub 主题。
  • 使用模板将 Bigtable 变更数据流部署到 Dataflow 上的 Pub/Sub 流水线。
  • 直接在 Pub/Sub 中或在 Cloud Run 函数日志中查看事件流。

费用

在本文档中,您将使用的以下收费组件: Google Cloud

您可使用 价格计算器 根据您的预计使用情况来估算费用。

新 Google Cloud 用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

    登录您的 Google Cloud 账号。如果您是 Google Cloud的新用户, 请创建一个账号,以便在 真实场景中评估我们产品的性能。新客户还可获享 $300 赠金,用于 运行、测试和部署工作负载。

    In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

    In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

    在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境 。该会话可能需要几秒钟来完成初始化。

  1. 更新并安装 cbt CLI 。
    gcloud components update
    gcloud components install cbt

创建 Pub/Sub 主题

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题 页面。

    打开“主题”。

  2. 点击创建主题

  3. 将 ID 设置为 bigtable-change-stream-topic

  4. 选择使用架构

  5. 选择 Pub/Sub 架构下拉列表中,点击创建新的架构。系统会打开一个新标签页,您可以在其中定义架构。

    1. 将架构 ID 设置为 bigtable-change-stream-schema
    2. 将架构类型设置为 Avro。 Avro
    3. 粘贴以下内容作为架构定义。如需详细了解 架构,请参阅模板文档 页面
      {
          "name" : "ChangelogEntryMessage",
          "type" : "record",
          "namespace" : "com.google.cloud.teleport.bigtable",
          "fields" : [
            { "name" : "rowKey", "type" : "bytes"},
            {
              "name" : "modType",
              "type" : {
                "name": "ModType",
                "type": "enum",
                "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
            },
            { "name": "isGC", "type": "boolean" },
            { "name": "tieBreaker", "type": "int"},
            { "name": "columnFamily", "type": "string"},
            { "name": "commitTimestamp", "type" : "long"},
            { "name" : "sourceInstance", "type" : "string"},
            { "name" : "sourceCluster", "type" : "string"},
            { "name" : "sourceTable", "type" : "string"},
            { "name": "column", "type" : ["null", "bytes"]},
            { "name": "timestamp", "type" : ["null", "long"]},
            { "name": "timestampFrom", "type" : ["null", "long"]},
            { "name": "timestampTo", "type" : ["null", "long"]},
            { "name" : "value", "type" : ["null", "bytes"]}
        ]
      }
    
    1. 点击创建 以创建架构。
  6. 关闭创建架构 标签页,刷新架构列表,然后选择新定义的架构。

  7. 点击创建 以创建主题。

可选:创建 Cloud Run 函数

您可能希望使用 Cloud Run 函数处理 Pub/Sub 数据流。

  1. bigtable-change-stream-topic 主题的详细信息 页面上,点击触发 Cloud Functions 函数
  2. 函数名称 字段中,输入名称 bt-ps-tutorial-function
  3. 源代码部分中,点击运行时下拉列表,然后选择所需的运行时和编程语言。系统会生成 hello world,用于输出传入的变更数据流。如需详细了解如何编写 Cloud Run 函数,请参阅 文档。
  4. 其他所有字段均使用默认值。
  5. 点击部署函数

创建启用了变更数据流的表

  1. 在 Google Cloud 控制台中,前往 Bigtable 实例 页面。

    转到实例

  2. 点击您在本教程中使用的实例的 ID。

    如果您没有可用实例,请在您附近的区域中使用默认配置创建实例

  3. 在左侧导航窗格中,点击

  4. 点击创建表

  5. 将表命名为 change-streams-pubsub-tutorial

  6. 添加一个名为 cf 的列族。

  7. 选择启用变更数据流

  8. 点击创建

初始化数据流水线以捕获变更数据流

  1. 在 Bigtable 页面上,找到表 change-streams-pubsub-tutorial
  2. 变更数据流列中,点击连接
  3. 在对话框中,选择 Pub/Sub
  4. 点击创建 Dataflow 作业
  5. 在 Dataflow 创建作业 页面上,将输出 Pub/Sub 主题名称设置为:bigtable-change-stream-topic
  6. 将 Bigtable 应用配置文件 ID 设置为 default
  7. 点击运行作业
  8. 等待作业状态为正在启动正在运行,然后再继续。作业排队后大约需要 5 分钟。

向 Bigtable 写入一些数据

  1. 在 Cloud Shell 中,向 Bigtable 写入几行,以便变更日志可以将一些数据写入 Pub/Sub 数据流。只要在创建作业后写入数据,系统就会显示变更。您无需等待作业状态变为 running

    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user123 cf:col1=abc
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user546 cf:col1=def
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user789 cf:col1=ghi
    

在 Pub/Sub 中查看变更日志

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅 页面。

    前往订阅页面。

  2. 点击为主题 bigtable-change-stream-topic 自动创建的订阅。该订阅应命名为 bigtable-change-stream-topic-sub

  3. 前往消息标签页。

  4. 点击拉取

  5. 探索消息列表并查看您写入的数据。

    Pub/Sub 中的变更日志消息

可选:在 Cloud Run 函数日志中查看变更

如果您创建了 Cloud Run 函数,则可以在日志中查看变更。

  1. 在 Google Cloud 控制台中,前往 Cloud Run 函数

    前往 Cloud Run 函数

  2. 点击函数 bt-ps-tutorial-function

  3. 前往日志标签页。

  4. 确保将严重级别至少设置为信息,以便您可以查看日志。

  5. 探索日志并查看您写入的数据。

输出类似于以下内容:

Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}

清理

为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除 Bigtable 表

  1. 在 Google Cloud 控制台中,前往 Bigtable 实例 页面。

    转到实例

  2. 点击您在本教程中使用的实例的 ID。

  3. 在左侧导航窗格中,点击

  4. 找到 change-streams-pubsub-tutorial 表。

  5. 点击修改

  6. 清除启用变更数据流

  7. 点击保存

  8. 打开该表的溢出菜单。

  9. 点击删除,然后输入表名称进行确认。

停止变更数据流流水线

  1. 在 Google Cloud 控制台中,前往 Dataflow 作业 页面。

    打开“作业”

  2. 从作业列表中选择您的流处理作业。

  3. 在导航中,点击停止

  4. 停止作业 对话框中,取消流水线,然后点击停止作业

删除 Pub/Sub 主题和订阅

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题 页面。

    打开“主题”。

  2. 选择 bigtable-change-stream-topic 主题。

  3. 点击删除并确认。

  4. 点击边栏中的订阅

  5. 选择 bigtable-change-stream-topic-sub 订阅。

  6. 点击删除 并确认。

删除 Cloud Run 函数

  1. 在 Google Cloud 控制台中,前往 Cloud Run 函数

    前往 Cloud Run 函数

  2. 选择 bt-ps-tutorial-function 函数。

  3. 点击删除并确认。

后续步骤