Pub/Sub 源连接器会将消息从 Pub/Sub 流式传输到 Kafka。 这样一来,您就可以将 Pub/Sub 与基于 Kafka 的应用和数据流水线集成。
连接器从 Pub/Sub 订阅中读取消息,将每条消息转换为 Kafka 记录,并将记录写入 Kafka 主题。默认情况下,连接器会按如下方式创建 Kafka 记录:
- Kafka 记录键为
null。 - Kafka 记录值是 Pub/Sub 消息数据(以字节为单位)。
- Kafka 记录标头为空。
不过,您可以配置此行为。如需了解详情,请参阅配置连接器。
准备工作
在创建 Pub/Sub 来源连接器之前,请确保您已准备好以下各项:
具有订阅的 Pub/Sub 主题。
Kafka 集群中的 Kafka 主题。
连接集群。 创建 Connect 集群时,请将 Managed Service for Apache Kafka 集群设置为主 Kafka 集群。
所需的角色和权限
如需获得创建 Pub/Sub Source 连接器所需的权限,请让您的管理员为您授予包含 Connect 集群的项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含创建 Pub/Sub Source 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
您需要具备以下权限才能创建 Pub/Sub 来源连接器:
-
在父级 Connect 集群上授予创建连接器的权限:
managedkafka.connectors.create
如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色。
如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果 Connect 集群位于其他项目中,请参阅在其他项目中创建 Connect 集群。
授予从 Pub/Sub 读取数据的权限
Managed Kafka 服务账号必须有权从 Pub/Sub 订阅中读取消息。向包含 Pub/Sub 订阅的项目中的服务账号授予以下 IAM 角色:
- Pub/Sub Subscriber (
roles/pubsub.subscriber) - Pub/Sub Viewer (
roles/pubsub.viewer)
受管 Kafka 服务账号采用以下格式:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com。
将 PROJECT_NUMBER 替换为项目编号。
创建 Pub/Sub 源连接器
控制台
在 Google Cloud 控制台中,前往连接集群页面。
点击要创建连接器的 Connect 集群。
点击创建连接器。
对于连接器名称,请输入一个字符串。
有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。
对于连接器插件,选择 Pub/Sub Source。
在 Cloud Pub/Sub 订阅列表中,选择一个 Pub/Sub 订阅。连接器从相应订阅中拉取消息。订阅以完整资源名称的形式显示:
projects/{project}/subscriptions/{subscription}。在 Kafka 主题列表中,选择写入消息的 Kafka 主题。
可选:在配置框中,添加配置属性或修改默认属性。如需了解详情,请参阅配置连接器。
选择任务重启政策。如需了解详情,请参阅任务重启政策。
点击创建。
gcloud
运行
gcloud managed-kafka connectors create命令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE替换以下内容:
CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。
LOCATION:Connect 集群的位置。
CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。
CONFIG_FILE:YAML 或 JSON 配置文件的路径。
以下是一个配置文件示例:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
替换以下内容:
PROJECT_ID:Pub/Sub 订阅所在的 Google Cloud项目的 ID。
PUBSUB_SUBSCRIPTION_ID:要从中拉取数据的 Pub/Sub 订阅的 ID。
KAFKA_TOPIC_ID:写入数据的 Kafka 主题的 ID。
必须提供 cps.project、cps.subscription 和 kafka.topic 配置属性。如需了解其他配置选项,请参阅配置连接器。
Terraform
您可以使用 Terraform 资源创建连接器。
如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令。
Go
在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC。
Java
在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC。
Python
在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC。
创建连接器后,您可以修改、删除、暂停、停止或重启连接器。
配置连接器
本部分介绍了一些可在连接器上设置的配置属性。
如需查看此连接器特有的属性的完整列表,请参阅 Pub/Sub Source 连接器配置。
拉取模式
拉取模式用于指定连接器检索 Pub/Sub 消息的方式。支持以下模式:
拉取模式(默认)。消息以批次形式拉取。如需启用此模式,请设置
cps.streamingPull.enabled=false.。如需配置批次大小,请设置cps.maxBatchSize属性。如需详细了解拉取模式,请参阅 Pull API。
Streaming Pull 模式。在从 Pub/Sub 检索消息时实现最大吞吐量和最低延迟时间。如需启用此模式,请设置
cps.streamingPull.enabled=true。如需详细了解流式拉取模式,请参阅 StreamingPull API。
如果启用了流式拉取,您可以通过设置以下配置属性来调整性能:
cps.streamingPull.flowControlBytes:每个任务的未处理消息字节数上限。cps.streamingPull.flowControlMessages:每个任务的未处理消息数上限。cps.streamingPull.maxAckExtensionMs:连接器延长订阅截止时间的最长时间(以毫秒为单位)。cps.streamingPull.maxMsPerAckExtension:连接器每次延长订阅截止时间的最长时间(以毫秒为单位)。cps.streamingPull.parallelStreams:从订阅中拉取消息的流数量。
Pub/Sub 端点
默认情况下,连接器使用全球 Pub/Sub 端点。如需指定端点,请将 cps.endpoint 属性设置为端点地址。如需详细了解端点,请参阅 Pub/Sub 端点。
Kafka 记录
Pub/Sub 源连接器会将 Pub/Sub 消息转换为 Kafka 记录。以下部分介绍了转换流程。
记录密钥
密钥转换器必须是 org.apache.kafka.connect.storage.StringConverter。
默认情况下,记录键为
null。如需使用 Pub/Sub 消息属性作为键,请将
kafka.key.attribute设置为相应属性的名称。例如kafka.key.attribute=username。如需使用 Pub/Sub 排序键作为键,请设置
kafka.key.attribute=orderingKey。
记录标头
默认情况下,记录标题为空。
如果 kafka.record.headers 为 true,Pub/Sub 消息特性将作为记录标头写入。如需添加排序键,请设置 cps.makeOrderingKeyAttribute=true。
记录值
如果 kafka.record.headers 为 true,或者 Pub/Sub 消息没有自定义属性,则记录值为消息数据(以字节数组的形式)。将值转换器设置为 org.apache.kafka.connect.converters.ByteArrayConverter。
否则,如果 kafka.record.headers 为 false 且消息至少包含一个自定义属性,连接器会将记录值写入为 struct。将值转换器设置为 org.apache.kafka.connect.json.JsonConverter。
struct 包含以下字段:
message:Pub/Sub 消息数据(以字节为单位)。每个 Pub/Sub 消息属性的字段。如需包含排序键,请设置
cps.makeOrderingKeyAttribute=true。
例如,假设消息具有 username 属性:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
如果 value.converter.schemas.enable 为 true,则 struct 同时包含载荷和架构:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Kafka 分区
默认情况下,连接器会写入主题中的单个分区。如需指定连接器写入的分区数量,请设置 kafka.partition.count 属性。该值不得超过主题的分区数。
如需指定连接器如何将消息分配给分区,请设置 kafka.partition.scheme 属性。如需了解详情,请参阅 Pub/Sub Source 连接器配置。