使用可选的 Cloud Run 函数触发器将变更流式传输到 Pub/Sub

本教程介绍了如何使用 Bigtable change streams to Pub/Sub 模板,包括如何设置主题并配置模板。您可以视需要使用所选的编程语言创建由事件流触发的 Cloud Run 函数。

本教程适用于熟悉 Bigtable、编写代码和事件流服务的技术用户。

创建 Pub/Sub 主题

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 主题页面。

    打开“主题”

  2. 点击创建主题

  3. 将 ID 设置为 bigtable-change-stream-topic

  4. 选择使用架构

  5. 选择 Pub/Sub 架构下拉列表中,点击创建新的架构。系统会打开一个新标签页,您可以在其中定义架构。

    1. 将架构 ID 设置为 bigtable-change-stream-schema
    2. 将架构类型设置为 Avro
    3. 粘贴以下内容作为架构定义。如需详细了解该架构,请参阅模板文档页面
      {
          "name" : "ChangelogEntryMessage",
          "type" : "record",
          "namespace" : "com.google.cloud.teleport.bigtable",
          "fields" : [
            { "name" : "rowKey", "type" : "bytes"},
            {
              "name" : "modType",
              "type" : {
                "name": "ModType",
                "type": "enum",
                "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
            },
            { "name": "isGC", "type": "boolean" },
            { "name": "tieBreaker", "type": "int"},
            { "name": "columnFamily", "type": "string"},
            { "name": "commitTimestamp", "type" : "long"},
            { "name" : "sourceInstance", "type" : "string"},
            { "name" : "sourceCluster", "type" : "string"},
            { "name" : "sourceTable", "type" : "string"},
            { "name": "column", "type" : ["null", "bytes"]},
            { "name": "timestamp", "type" : ["null", "long"]},
            { "name": "timestampFrom", "type" : ["null", "long"]},
            { "name": "timestampTo", "type" : ["null", "long"]},
            { "name" : "value", "type" : ["null", "bytes"]}
        ]
      }
    
    1. 点击创建以创建架构。
  6. 关闭创建架构标签页,刷新架构列表,然后选择新定义的架构。

  7. 点击创建以创建主题。

可选:创建 Cloud Run 函数

您可能希望使用 Cloud Run 函数处理 Pub/Sub 数据流。

  1. bigtable-change-stream-topic 主题的详细信息页面上,点击触发 Cloud Functions 函数
  2. 函数名称字段中,输入名称 bt-ps-tutorial-function
  3. 源代码部分中,点击运行时下拉列表,然后选择所需的运行时和编程语言。系统会生成 hello world,用于输出传入的变更数据流。如需详细了解如何编写 Cloud Run functions 函数,请参阅相关文档。
  4. 其他所有字段均使用默认值。
  5. 点击部署函数

创建启用了变更数据流的表

  1. 在 Google Cloud 控制台中,前往 Bigtable 实例页面。

    转到实例

  2. 点击您在本教程中使用的实例的 ID。

    如果您没有可用实例,请在您附近的区域中使用默认配置创建实例

  3. 在左侧导航窗格中,点击

  4. 点击创建表

  5. 将表格命名为 change-streams-pubsub-tutorial

  6. 添加一个名为 cf 的列族。

  7. 选择启用变更数据流

  8. 点击创建

初始化数据流水线以捕获变更数据流

  1. 在 Bigtable 页面上,找到表 change-streams-pubsub-tutorial
  2. 变更数据流列中,点击连接
  3. 在对话框中,选择 Pub/Sub
  4. 点击创建 Dataflow 作业
  5. 在 Dataflow 创建作业页面上,将输出 Pub/Sub 主题名称设置为:bigtable-change-stream-topic
  6. 将 Bigtable 应用配置文件 ID 设置为 default
  7. 点击运行作业
  8. 等待作业状态为正在启动正在运行,然后再继续。作业排队后,大约需要 5 分钟才能完成。

向 Bigtable 写入一些数据

  1. 在 Cloud Shell 中,向 Bigtable 写入几行,以便变更日志可以将一些数据写入 Pub/Sub 数据流。只要在创建作业后写入数据,系统就会显示变更。您不必等待作业状态变为 running

    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user123 cf:col1=abc
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user546 cf:col1=def
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user789 cf:col1=ghi
    

在 Pub/Sub 中查看变更日志

  1. 在 Google Cloud 控制台中,前往 Pub/Sub 订阅页面。

    前往订阅页面

  2. 点击为主题 bigtable-change-stream-topic 自动创建的订阅。该订阅应命名为 bigtable-change-stream-topic-sub

  3. 前往消息标签页。

  4. 点击拉取

  5. 探索消息列表并查看您写入的数据。

    Pub/Sub 中的变更日志消息

可选:在 Cloud Run functions 日志中查看变更

如果您创建了 Cloud Run functions 函数,则可以在日志中查看变更。

  1. 在 Google Cloud 控制台中,前往 Cloud Run functions

    前往 Cloud Run functions

  2. 点击函数 bt-ps-tutorial-function

  3. 前往日志标签页。

  4. 确保将严重级别至少设置为信息,以便您可以查看日志。

  5. 探索日志并查看您写入的数据。

输出类似于以下内容:

Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}