Pub/Sub 接收器连接器会将消息从 Kafka 主题流式传输到 Pub/Sub 主题。这样,您就可以将基于 Kafka 的应用与 Pub/Sub 集成,从而实现事件驱动型架构和实时数据处理。
准备工作
在创建 Pub/Sub Sink 连接器之前,请确保您已准备好以下各项:
为您的 Connect 集群创建 Managed Service for Apache Kafka 集群。这是与 Connect 集群关联的 Kafka 主集群。这也是构成连接器流水线一端的源集群。
创建 Connect 集群以托管 Pub/Sub 接收器连接器。
在源集群中创建并配置 Kafka 主题。数据从该 Kafka 主题移动到目标 Pub/Sub 主题。
所需的角色和权限
如需获得创建 Pub/Sub Sink 连接器所需的权限,请让管理员向您授予包含 Connect 集群的项目的以下 IAM 角色:
-
Managed Kafka Connector Editor (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub Publisher (
roles/pubsub.publisher)
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
这些预定义角色包含创建 Pub/Sub Sink 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需创建 Pub/Sub 接收器连接器,您需要以下权限:
-
在父级 Connect 集群上授予创建连接器的权限:
managedkafka.connectors.create
如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色。
如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果 Connect 集群位于其他项目中,请参阅在其他项目中创建 Connect 集群。
授予向 Pub/Sub 主题发布消息的权限
Connect 集群服务账号(格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)需要有权将消息发布到 Pub/Sub 主题。为此,请向包含 Pub/Sub 主题的项目中的 Connect 集群服务账号授予 Pub/Sub Publisher 角色 (roles/pubsub.publisher)。
Pub/Sub 接收器连接器的工作原理
Pub/Sub 接收器连接器从一个或多个 Kafka 主题拉取消息,并将其发布到 Pub/Sub 主题。
下面详细介绍了 Pub/Sub 接收器连接器如何复制数据:
连接器会使用源集群内一个或多个 Kafka 主题中的消息。
连接器会将消息写入使用
cps.topic配置属性指定的目标 Pub/Sub 主题 ID。这是必需属性。连接器还需要使用
cps.project配置属性指定包含 Pub/Sub 主题的 Google Cloud 项目。这是一个必需的属性。连接器还可以选择使用通过
cps.endpoint属性指定的自定义 Pub/Sub 端点。默认端点为"pubsub.googleapis.com:443"。为了优化性能,连接器会在将消息发布到 Pub/Sub 之前对其进行缓冲。您可以配置
maxBufferSize、maxBufferBytes、maxDelayThresholdMs、maxOutstandingRequestBytes和maxOutstandingMessages来控制缓冲。Kafka 记录包含三个组成部分:标头、键、值。 连接器使用键和值转换器将 Kafka 消息数据转换为 Pub/Sub 所需的格式。使用结构或映射值架构时,
messageBodyName属性用于指定要用作 Pub/Sub 消息正文的字段或键。通过将
metadata.publish属性设置为true,连接器可以将 Kafka 主题、分区、偏移量和时间戳作为消息属性包含在内。连接器可以使用
headers.publish属性(设置为true)将 Kafka 消息标头作为 Pub/Sub 消息属性包含在内。连接器可以使用
orderingKeySource属性为 Pub/Sub 消息添加排序键。此值的选项包括"none"(默认)、"key"和"partition"。tasks.max属性用于控制连接器的并行级别。增加tasks.max可以提高吞吐量,但实际并行性受 Kafka 主题中分区数量的限制。
Pub/Sub 接收器连接器的属性
创建 Pub/Sub 接收器连接器时,您需要指定以下属性。
连接器名称
Connect 集群中连接器的唯一名称。 有关如何命名资源的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。
连接器插件类型
选择 Pub/Sub 接收器作为连接器插件类型。这决定了数据流的方向(从 Kafka 到 Pub/Sub)以及所用的具体连接器实现。如果您不使用界面来配置连接器,还必须指定连接器类。
Kafka 主题
连接器从中消费消息的 Kafka 主题。
您可以指定一个或多个主题,也可以使用正则表达式来匹配多个主题。例如,topic.* 可匹配以“topic”开头的所有主题。这些主题必须存在于与您的 Connect 集群关联的 Managed Service for Apache Kafka 集群中。
Pub/Sub 主题
连接器将消息发布到的现有 Pub/Sub 主题。确保 Connect 集群服务账号对主题的项目具有 roles/pubsub.publisher 角色,如准备工作中所述。
配置
您可以在此部分中指定其他连接器专用配置属性。
由于 Kafka 主题中的数据可以采用各种格式(例如 Avro、JSON 或原始字节),因此配置的关键部分是指定转换器。转换器会将 Kafka 主题中使用的格式的数据转换为 Kafka Connect 的标准化内部格式。然后,Pub/Sub 接收器连接器会获取此内部数据,并将其转换为 Pub/Sub 所需的格式,然后再写入。
如需详细了解转换器在 Kafka Connect 中的作用、支持的转换器类型和常见配置选项,请参阅转换器。
以下是 Pub/Sub 接收器连接器特有的一些配置:
cps.project:指定包含 Pub/Sub 主题的 Google Cloud 项目 ID。cps.topic:指定要将数据发布到的 Pub/Sub 主题。cps.endpoint:指定要使用的 Pub/Sub 端点。
如需查看此连接器特有的可用配置属性的列表,请参阅 Pub/Sub Sink 连接器配置。
创建 Pub/Sub 接收器连接器
在创建连接器之前,请查看Pub/Sub 接收器连接器的属性的相关文档。
控制台
在 Google Cloud 控制台中,前往连接集群页面。
点击要为其创建连接器的 Connect 集群。
系统会显示连接集群详情页面。
点击创建连接器。
系统会显示创建 Kafka 连接器页面。
对于连接器名称,请输入一个字符串。
有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。
对于连接器插件,选择 Pub/Sub Sink。
在主题下,选择选择一个 Kafka 主题列表或使用主题正则表达式。然后,选择或输入此连接器从中消费消息的 Kafka 主题。这些主题位于您的关联 Kafka 集群中。
在选择 Cloud Pub/Sub 主题部分,选择此连接器将消息发布到的 Pub/Sub 主题。主题以完整资源名称格式显示:
projects/{project}/topics/{topic}。(可选)在配置部分中配置其他设置。您可以在此处指定
tasks.max、key.converter和value.converter等属性,如上一部分中所述。选择任务重启政策。如需了解详情,请参阅任务重启政策。
点击创建。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud managed-kafka connectors create命令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE替换以下内容:
CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。
LOCATION:您创建连接器的位置。此位置必须与您创建 Connect 集群的位置相同。
CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。
CONFIG_FILE:BigQuery Sink 连接器的 YAML 配置文件路径。
以下是 Pub/Sub Sink 连接器的配置文件示例:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"替换以下内容:
CPS_SINK_CONNECTOR_ID:Pub/Sub Sink 连接器的 ID 或名称。如需了解如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 连接器的名称不可变。
GMK_TOPIC_ID:Managed Service for Apache Kafka 主题的 ID,Pub/Sub 接收器连接器从中读取数据。
CPS_TOPIC_ID:发布数据的 Pub/Sub 主题的 ID。
GCP_PROJECT_ID:Pub/Sub 主题所在的 Google Cloud项目的 ID。
Terraform
您可以使用 Terraform 资源创建连接器。
如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令。
Go
在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC。
Java
在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC。
Python
在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC。
创建连接器后,您可以修改、删除、暂停、停止或重启连接器。