本文档介绍了如何使用 Pub/Sub Group Kafka 连接器集成 Apache Kafka 和 Pub/Sub。
关于 Pub/Sub Group Kafka 连接器
Apache Kafka 是一个用于流式传输事件的开源平台。它通常用于分布式架构,可在松散耦合的各组件之间实现通信。Pub/Sub 是一种用于异步发送和接收消息的代管式服务。与 Kafka 类似,您可以使用 Pub/Sub 在云架构中的组件之间进行通信。
借助 Pub/Sub Group Kafka Connector,您可以集成这两个系统。 连接器 JAR 中包含以下连接器:
- 接收器连接器可从一个或多个 Kafka 主题读取记录,并将其发布到 Pub/Sub。
- 源连接器从 Pub/Sub 主题读取消息并将其发布到 Kafka。
以下是一些您可能会使用 Pub/Sub Group Kafka 连接器的场景:
- 您正在将基于 Kafka 的架构迁移到 Google Cloud。
- 您有一个前端系统,用于将事件存储在Google Cloud外部的 Kafka 中,但您也使用 Google Cloud 来运行一些需要接收 Kafka 事件的后端服务。
- 您从本地 Kafka 解决方案收集日志,并将其发送到Google Cloud 以进行数据分析。
- 您有一个使用 Google Cloud的前端系统,但您也使用 Kafka 在本地存储数据。
该连接器需要 Kafka Connect,这是一个用于在 Kafka 和其他系统之间流式传输数据的框架。如需使用连接器,您必须在 Kafka 集群旁边运行 Kafka Connect。
本文档假定您熟悉 Kafka 和 Pub/Sub。在阅读本文档之前,建议您先完成某个 Pub/Sub 快速入门。
Pub/Sub 连接器不支持 Google Cloud IAM 与 Kafka Connect ACL 之间的任何集成。
连接器使用入门
本部分将引导您完成以下任务:- 配置 Pub/Sub Group Kafka 连接器。
- 将事件从 Kafka 发送到 Pub/Sub。
- 将消息从 Pub/Sub 发送到 Kafka。
前提条件
安装 Kafka
按照 Apache Kafka 快速入门在本地机器上安装单节点 Kafka。完成快速入门中的以下步骤:
- 下载最新的 Kafka 版本并将其解压。
- 启动 Kafka 环境。
- 创建 Kafka 主题。
身份验证
Pub/Sub Group Kafka 连接器必须通过 Pub/Sub 身份验证,才能发送和接收 Pub/Sub 消息。如需设置身份验证,请执行以下步骤:
- 登录您的 Google Cloud 账号。如果您是 Google Cloud新手,请 创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
-
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
-
安装 Google Cloud CLI。
-
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init -
选择或创建项目所需的角色
- 选择项目:选择项目不需要特定的 IAM 角色,您可以选择已获授角色的任何项目。
-
创建项目:如需创建项目,您需要拥有 Project Creator 角色 (
roles/resourcemanager.projectCreator),该角色包含resourcemanager.projects.create权限。了解如何授予角色。
-
创建 Google Cloud 项目:
gcloud projects create PROJECT_ID
将
PROJECT_ID替换为您要创建的 Google Cloud 项目的名称。 -
选择您创建的 Google Cloud 项目:
gcloud config set project PROJECT_ID
将
PROJECT_ID替换为您的 Google Cloud 项目名称。
-
为您的用户账号创建本地身份验证凭证:
gcloud auth application-default login
如果系统返回身份验证错误,并且您使用的是外部身份提供方 (IdP),请确认您已 使用联合身份登录 gcloud CLI。
-
向您的用户账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/pubsub.admingcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
替换以下内容:
PROJECT_ID:您的项目 ID。USER_IDENTIFIER:您的用户 账号的标识符。例如,myemail@example.com。ROLE:您授予用户账号的 IAM 角色。
下载连接器 JAR
将连接器 JAR 文件下载到本地机器。如需了解详情,请参阅 GitHub README 中的获取连接器。
复制连接器配置文件
克隆或下载连接器的 GitHub 代码库。
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector将
config目录的内容复制到 Kafka 安装的config子目录中。cp config/* [path to Kafka installation]/config/
这些文件包含连接器的配置设置。
更新 Kafka Connect 配置
- 前往包含您下载的 Kafka Connect 二进制文件的目录。
- 在 Kafka Connect 二进制文件目录中,通过文本编辑器打开名为
config/connect-standalone.properties的文件。 - 如果
plugin.path property被注释掉,请取消注释。 更新
plugin.path property以包含连接器 JAR 的路径。示例:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar将
offset.storage.file.filename属性设置为本地文件名。在独立模式下,Kafka 使用此文件来存储偏移量数据。示例:
offset.storage.file.filename=/tmp/connect.offsets
将事件从 Kafka 转发到 Pub/Sub
本部分介绍如何启动接收器连接器、向 Kafka 发布事件,然后从 Pub/Sub 读取转发的消息。
使用 Google Cloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:相应主题的 Pub/Sub 订阅的名称。
在文本编辑器中打开文件
/config/cps-sink-connector.properties。为以下属性添加值(在注释中标记为"TODO"):topics=KAFKA_TOPICS cps.project=PROJECT_ID cps.topic=PUBSUB_TOPIC
替换以下内容:
- KAFKA_TOPICS:要从中读取的 Kafka 主题的逗号分隔列表。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:用于接收来自 Kafka 的消息的 Pub/Sub 主题。
在 Kafka 目录中,运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-sink-connector.properties按照 Apache Kafka 快速入门中的步骤将一些事件写入 Kafka 主题。
使用 gcloud CLI 从 Pub/Sub 读取事件。
gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack
将消息从 Pub/Sub 转发到 Kafka
本部分介绍了如何启动源连接器、向 Pub/Sub 发布消息,以及从 Kafka 读取转发的消息。
使用 gcloud CLI 创建具有订阅的 Pub/Sub 主题。
gcloud pubsub topics create PUBSUB_TOPIC gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC
替换以下内容:
- PUBSUB_TOPIC:Pub/Sub 主题的名称。
- PUBSUB_SUBSCRIPTION:Pub/Sub 订阅的名称。
在文本编辑器中打开名为
/config/cps-source-connector.properties的文件。为以下属性添加值(在注释中标记为"TODO"):kafka.topic=KAFKA_TOPIC cps.project=PROJECT_ID cps.subscription=PUBSUB_SUBSCRIPTION
替换以下内容:
- KAFKA_TOPIC:用于接收 Pub/Sub 消息的 Kafka 主题。
- PROJECT_ID:包含您的 Pub/Sub 主题的 Google Cloud 项目。
- PUBSUB_TOPIC:Pub/Sub 主题。
从 Kafka 目录中,运行以下命令:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/cps-source-connector.properties使用 gcloud CLI 将消息发布到 Pub/Sub。
gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
从 Kafka 读取消息。按照 Apache Kafka 快速入门中的步骤从 Kafka 主题读取消息。
消息转化
Kafka 记录包含一个键和一个值,它们都是长度可变的字节数组。Kafka 记录还可以选择性地包含标头(即键值对)。Pub/Sub 消息主要包含两个部分:消息正文和零个或多个键值属性。
Kafka Connect 使用转换器将键和值序列化为 Kafka,并从 Kafka 序列化键和值。 如需控制序列化,请在连接器配置文件中设置以下属性:
key.converter:用于序列化记录键的转换器。value.converter:用于序列化记录值的转换器。
Pub/Sub 消息的正文是一个 ByteString 对象,因此最有效的转换是直接复制载荷。因此,我们建议尽可能使用可生成原始数据类型(整数、浮点数、字符串或字节架构)的转换器,以防止对同一邮件正文进行反序列化和重新序列化。
从 Kafka 转换为 Pub/Sub
接收器连接器会将 Kafka 记录转换为 Pub/Sub 消息,如下所示:
- Kafka 记录键以名为
"key"的属性存储在 Pub/Sub 消息中。 - 默认情况下,连接器会舍弃 Kafka 记录中的所有标头。不过,如果您将
headers.publish配置选项设置为true,连接器会将标头写入为 Pub/Sub 属性。连接器会跳过任何超出 Pub/Sub 消息属性限制的标头。 - 对于整数、浮点数、字符串和字节架构,连接器会将 Kafka 记录值的字节直接传递到 Pub/Sub 消息正文中。
- 对于结构体架构,连接器会将每个字段写入为 Pub/Sub 消息的属性。例如,如果该字段为
{ "id"=123 },则生成的 Pub/Sub 消息将具有属性"id"="123"。字段值始终会转换为字符串。不支持将 Map 和结构体类型作为结构体中的字段类型。 - 对于映射架构,连接器会将每个键值对写入为 Pub/Sub 消息的属性。例如,如果映射为
{"alice"=1,"bob"=2},则生成的 Pub/Sub 消息具有两个属性,即"alice"="1"和"bob"="2"。键和值会转换为字符串。
结构体和映射架构还有一些其他行为:
您可以选择设置
messageBodyName配置属性,以指定某个特定的结构体字段或映射键作为信息正文。相应字段或键的值以ByteString形式存储在消息正文中。如果您未设置messageBodyName,则结构和映射架构的邮件正文为空。对于数组值,连接器仅支持原初数组类型。数组中的值序列会连接成一个
ByteString对象。
从 Pub/Sub 转换为 Kafka
源连接器会将 Pub/Sub 消息转换为 Kafka 记录,如下所示:
Kafka 记录键:默认情况下,该键设置为
null。您可以选择设置kafka.key.attribute配置选项,指定要用作键的 Pub/Sub 消息属性。在这种情况下,连接器会查找具有该名称的属性,并将记录键设置为该属性值。如果指定的属性不存在,则将记录键设置为null。Kafka 记录值。连接器会按如下方式写入记录值:
如果 Pub/Sub 消息没有自定义属性,连接器会使用
value.converter指定的转换器,将 Pub/Sub 消息正文直接写入 Kafka 记录值,作为byte[]类型。如果 Pub/Sub 消息具有自定义属性且
kafka.record.headers为false,则连接器会将结构体写入记录值。该结构体包含每个属性对应的一个字段,以及一个名为"message"的字段,其值为 Pub/Sub 消息正文(以字节形式存储):{ "message": "<Pub/Sub message body>", "<attribute-1>": "<value-1>", "<attribute-2>": "<value-2>", .... }在这种情况下,您必须使用与
struct架构兼容的value.converter,例如org.apache.kafka.connect.json.JsonConverter。如果 Pub/Sub 消息具有自定义属性,并且
kafka.record.headers为true,则连接器会将这些属性写入为 Kafka 记录标头。它使用value.converter指定的转换器,将 Pub/Sub 消息正文直接写入 Kafka 记录值(作为byte[]类型)。
Kafka 记录标头。默认情况下,除非您将
kafka.record.headers设置为true,否则标头为空。
配置选项
除了 Kafka Connect API 提供的配置之外,Pub/Sub Group Kafka Connector 还支持Pub/Sub 连接器配置中所述的接收器和源配置。
获取支持
如果您需要帮助,请创建支持服务工单。 如有一般性问题和讨论,请在 GitHub 代码库中创建问题。
后续步骤
- 了解 Kafka 与 Pub/Sub 之间的区别。
- 详细了解 Pub/Sub Group Kafka Connector。
- 请参阅 Pub/Sub Group Kafka 连接器的 GitHub 代码库。