本文档简要介绍了Google Cloud中的 Kafka Connect 连接器。了解何时使用每种连接器类型来管理和集成数据流。
这些连接器使用 Kafka Connect 框架将 Apache Kafka 与其他应用集成。它们用于在 Kafka 集群和应用之间注入和复制数据。可用的连接器类型包括:
MirrorMaker 2.0 连接器
来源连接器
检查点连接器
心跳连接器
BigQuery Sink 连接器
Cloud Storage 接收器连接器
Pub/Sub 来源连接器
Pub/Sub 接收器连接器
MirrorMaker 2.0 连接器专门用于在 Kafka 集群之间进行数据复制和灾难恢复。它们有助于将数据从一个 Kafka 集群镜像到另一个 Kafka 集群,从而实现高可用性和容错性。
MirrorMaker 2.0 连接器可以在 Managed Service for Apache Kafka 集群与其他 Managed Service for Apache Kafka 集群或自管理的 Kafka 集群之间建立连接。
其他 Sink 和 Source 连接器用于将 Kafka 与各种Google Cloud 服务集成。借助这些连接器,您可以在 Managed Service for Apache Kafka 集群与 Google Cloud 服务(例如 BigQuery、Cloud Storage 或 Pub/Sub)之间转移数据。
准备工作
在探索和创建连接器之前,请确保您了解以下信息并满足以下前提条件:
具备 Kafka Connect 和 Connect 集群的实际运用知识。 您必须先创建 Connect 集群,然后才能部署连接器。
对于接收器和源连接器,您需要了解 BigQuery 表、Cloud Storage 存储分区或 Pub/Sub 主题和订阅,具体取决于您配置的连接器类型。
熟悉 YAML 或 JSON 配置文件,因为连接器是使用这些格式配置的。
何时使用 MirrorMaker 2.0
在以下场景中使用 MirrorMaker 2.0 连接器:
迁移数据:将 Kafka 工作负载迁移到新的 Managed Service for Apache Kafka 集群。
从灾难中恢复:创建备份集群,以确保在发生故障时业务能够持续运行。
汇总数据:将多个 Kafka 集群中的数据整合到中央 Managed Service for Apache Kafka 集群中,以用于分析。
MirrorMaker 2.0 主要功能
- 复制所有必需的组件,包括主题、数据、配置、具有偏移量的消费群组和 ACL。
- 在目标集群中保持相同的分区方案,从而简化应用的过渡。
- 自动检测并复制新主题和分区,最大限度地减少手动配置。
- 提供基本指标(例如端到端复制延迟时间),以便您跟踪复制过程的健康状况和性能。
- 即使在数据量很大的情况下,也能确保可靠运行,并且可以横向扩缩以应对不断增加的工作负载。
- 使用内部主题进行偏移同步、检查点和心跳。这些主题具有可配置的复制因子(例如
offset.syncs.topic.replication.factor),以确保高可用性和容错能力。
使用 MirrorMaker 2.0 Source 连接器
MirrorMaker 2.0 源连接器会将某个 Kafka 集群(源)的主题和数据复制到另一个 Kafka 集群(目标)。
| 来源 | 目标 |
|---|---|
| Managed Service for Apache Kafka 集群 | Managed Service for Apache Kafka 集群 |
| Managed Service for Apache Kafka 集群 | 外部或自行管理的 Kafka 集群 |
| 外部或自行管理的 Kafka 集群 | Managed Service for Apache Kafka 集群 |
MirrorMaker 2.0 源连接器支持以下迁移方案:
将数据从外部或自行管理的 Kafka 集群复制或迁移到 Managed Service for Apache Kafka 集群
将数据从 Managed Service for Apache Kafka 集群复制或迁移到外部或自行管理的 Kafka 集群。
跨区域复制 Kafka 数据,以满足灾难恢复和高可用性要求。
使用 MirrorMaker 2.0 检查点连接器
MirrorMaker 2.0 检查点连接器的使用是可选的。 它会复制消费者偏移量,这些偏移量表示上次成功消费的消息。此过程可确保目标集群上的消费者能够从与源集群相同的点恢复处理。
MirrorMaker 2.0 源连接器不需要此连接器即可正常运行。只有在需要同步 ConsumerGroup 状态以在从源集群切换到目标集群期间实现最短停机时间时,才需要此连接器。如果您只需要源数据的副本,则不需要此连接器。
在以下使用情形下,请使用 MirrorMaker 2.0 检查点连接器:
灾难恢复,以在集群之间保持一致的消费者状态并实现无缝故障切换。
在关键场景中保留消费者的进度。
使用 MirrorMaker 2.0 检测信号连接器
MirrorMaker 2.0 检测信号连接器是一种可选组件,可在源 Kafka 集群上定期生成检测信号消息。连接器会将这些消息写入专用主题,该主题通常命名为 heartbeats。
您可以配置 MirrorMaker 2.0 源连接器,以将 heartbeats 主题复制到目标集群。通过在目标集群上观察此复制的主题,您可以监控主题复制流程的状态和性能。这样一来,即使没有生成或复制其他数据,您也可以验证集群之间的连接和数据流。
单独部署 Heartbeat 连接器不会自动监控复制健康状况。如需将其用于监控,您必须复制 heartbeats 主题,然后在目标集群上观察其存在情况和及时性,或者使用会消耗这些心跳的监控工具。
MirrorMaker 2.0 Source 连接器不需要 Heartbeat 连接器即可正常运行。在以下使用情形下,请使用 MirrorMaker 2.0 检测信号连接器:
监控 MirrorMaker 2 复制的健康状况和状态。
使用生成的心跳和可用指标在 Cloud Monitoring 中配置提醒,以便在复制或心跳停止时收到通知。
使用接收器连接器
接收器连接器会将数据从 Kafka 主题导出到其他系统。
使用 BigQuery 接收器连接器
BigQuery 接收器连接器会将数据从 Kafka 主题流式传输到 BigQuery 表。
在以下使用情形下使用 BigQuery Sink 连接器:
数据仓库,用于将流式数据加载到 BigQuery 中以进行分析和报告。
填充为实时信息中心提供支持的 BigQuery 表。
使用 Cloud Storage 接收器连接器
Cloud Storage 接收器连接器会将数据从 Kafka 主题流式传输到 Cloud Storage 存储分区。
Cloud Storage Sink 连接器适用于以下使用情形:
数据湖注入,用于将 Kafka 数据存储在数据湖中,以便进行长期归档和批处理。
归档数据以满足监管要求。
使用 Pub/Sub 接收器连接器
Pub/Sub 接收器连接器会将消息从 Kafka 主题流式传输到 Pub/Sub 主题。
在以下使用情形下,请使用 Pub/Sub 接收器连接器:
服务集成,用于将数据从 Kafka 发送到其他 Google Cloud从 Pub/Sub 使用数据的服务或应用。
根据处理后的数据触发实时通知或操作。
使用 Source 连接器
源连接器会将其他系统中的数据导入 Kafka 主题。
使用 Pub/Sub 来源连接器
Pub/Sub 源连接器会将消息从 Pub/Sub 订阅流式传输到 Kafka 主题。
在以下使用情形下使用 Pub/Sub Source 连接器:
实时数据注入,从云服务或其他应用提取数据并发布到 Pub/Sub 中,以便在 Kafka 中进行流处理。
事件驱动型架构,根据发布到 Pub/Sub 的事件触发基于 Kafka 的处理。
任务重启政策
您可以设置连接器的任务重启政策,该政策决定了发生故障时的行为。连接器支持以下政策:
永不重启。连接器不会重启失败的任务。此政策是默认行为。这对于调试或在发生错误后需要人工干预的情况下非常有用。
使用指数退避算法重新启动。连接器会在延迟(称为退避时间段)后重新启动失败的任务。每次后续失败都会使延迟时间呈指数级增长。建议将此政策用于大多数生产工作负载。
如果您使用指数退避算法政策,还需为最小退避和最大退避设置值。退避时长下限应大于 60 秒,退避时长上限应小于 7200 秒。
转换和谓词
Kafka Connect 支持默认的 Kafka 转换和谓词。
您可以在连接器配置中指定配置。例如,如需配置接收器连接器以忽略包含 DoNotProcess 标头键的消息,您需要向连接器添加以下配置:
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
此配置会执行以下操作:
配置一个名为
hasKey且类型为org.apache.kafka.connect.transforms.predicates.HasHeaderKey的谓词。 此谓词匹配包含键为DoNotProcess的标头的所有消息。配置名为
dropMessage且类型为org.apache.kafka.connect.transforms.Filter的转换。 此转换会舍弃与配置的谓词匹配的所有消息。将转换与谓词
hasKey相关联。这样可确保只有包含DoNotProcess标头键的消息会被转换丢弃。