对于 Kafka 开发者而言,Kafka Connect 是首选的数据集成工具。它提供了一个框架,用于将 Kafka 与数据库、消息队列和文件系统等外部系统连接起来。
Kafka Connect 提供了一组由 Google Cloud审核和维护的精选内置连接器插件。这些连接器插件会自动修补和升级,从而简化维护并确保兼容性。Google Cloud 还提供内置的监控和日志记录功能,以维护流水线的健康状况。
Kafka Connect API 作为 Google Cloud Managed Service for Apache Kafka 服务的一部分提供。这些 API 可通过 managedkafka.googleapis.com 访问,并已集成到 Google Cloud 控制台和客户端库中。如需管理 Kafka Connect,您可以使用 Google Cloud 控制台、gcloud CLI、Managed Kafka API、云客户端库或 Terraform。
Kafka Connect 应用场景
Kafka Connect 支持在 Managed Service for Apache Kafka 集群与各种其他系统之间进行数据集成。以下是一些关键应用场景:
将现有 Kafka 部署迁移到 Managed Service for Apache Kafka。
将 Managed Service for Apache Kafka 集群复制到另一个区域,以实现灾难恢复。
将数据从 Managed Service for Apache Kafka 流式传输到 BigQuery、Cloud Storage、Pub/Sub。
Kafka Connect 术语
以下部分讨论了某些关键的 Kafka Connect 组件。
Connect 集群
Connect 集群是 Kafka Connect 的分布式部署,包含预打包的连接器插件和配置。每个 Connect 集群都与一个主 Managed Service for Apache Kafka 集群相关联。此主集群存储在 Connect 集群上运行的连接器的状态。
一般来说,主 Managed Service for Apache Kafka 集群也充当关联的 Connect 集群上运行的所有源连接器的目标,以及所有接收器连接器的源。
单个 Managed Service for Apache Kafka 集群可以有多个 Connect 集群。如果运行的是 MirrorMaker 2.0,Connect 集群可以连接到非主 Managed Service for Apache Kafka 集群或自行管理的 Kafka 集群,以读取或写入主题数据。此过程可实现不同集群之间的主题复制。
从资源模型的角度来看,Connect 集群是与 Managed Service for Apache Kafka 集群分开的资源。
假设您有一个 Managed Service for Apache Kafka 集群,用于存储网站流量数据。您希望将这些数据流式传输到 BigQuery 中进行分析。您可以创建一个 Connect 集群,并使用 BigQuery 接收器连接器将数据从 Kafka 主题移至 BigQuery。此 Connect 集群与您的 Managed Service for Apache Kafka 集群相关联,并将其作为主集群。
连接器插件
用于创建连接器的软件包。您可以将其视为定义连接器逻辑的代码。
连接器可以是源连接器或接收器连接器。源连接器将数据从来源写入 Managed Service for Apache Kafka 集群。
接收器连接器将数据从 Managed Service for Apache Kafka 集群写入接收器。
Managed Service for Apache Kafka 支持多种类型的内置连接器插件,您可以配置这些插件来创建连接器。这些连接器可与 Pub/Sub 或 BigQuery 等常用服务集成。这些连接器插件如下:
BigQuery Sink 连接器插件
Cloud Storage Sink 连接器插件
Pub/Sub 来源连接器插件
Pub/Sub 接收器连接器插件
MirrorMaker 2.0 连接器插件
连接器
连接器是特定 Connect 集群中连接器插件的运行实例。您可以从同一连接器插件创建多个连接器,每个连接器都有自己的特定配置。配置示例包括不同的身份验证详细信息和操作设置。 连接器在 Connect 集群中部署、配置和管理。您可以启动、停止、暂停、重启该服务,还可以更新其配置。
连接器的各个组成部分将在后续部分中讨论。
转化者数量
转换器是 Kafka Connect 中的关键组件,负责序列化和反序列化。它们在 Kafka 主题(例如 Avro 或 JSON 格式)上找到的原始字节线格式的数据与 Kafka Connect 的内部结构化数据表示形式之间转换数据。
转化者的角色
对于 Sink 连接器,转换器会将主题的线路格式中的数据反序列化为 Kafka Connect 的内部结构化数据表示形式,然后连接器会使用该表示形式将数据写入目标系统。
对于源连接器,转换器会将 Kafka Connect 的内部结构化数据表示形式(由连接器提供)中的数据序列化为 Kafka 主题的指定网络格式。
此内部格式可作为通用表示形式,实现各种中间处理步骤。这些步骤包括过滤器、谓词、转换和转换器等原语,所有这些原语都以这种统一的内部格式运行。通过使用抽象的内部格式,这些中间步骤的逻辑与特定的输入或输出数据格式保持独立。
当您需要与数据互动,而不仅仅是传递数据时,就需要使用转换器。具体而言,如果您需要以精细的结构感知方式执行中间处理步骤(例如谓词或转换),则需要转换器。
如果您只是想将字节字符串(即使是 JSON)从来源移至 Kafka,而无需进行任何操作,则不需要转换器。
在连接器配置中,如果您未指定键和值转换器,连接器会使用默认值 ByteArrayConverter。org.apache.kafka.connect.converters.ByteArrayConverter 值不会对数据应用任何转换,而是以原始格式传递数据。
支持的转化器
在此版本中, Google Cloud 支持以下内置转换器:
org.apache.kafka.connect.converters.ByteArrayConverter:将数据转换为字节数组以及从字节数组转换数据。这是默认转换器。它以原始底层字节的形式通过连接器传递数据。org.apache.kafka.connect.json.JsonConverter:将数据转换为 JSON 格式,或将 JSON 格式的数据转换为其他格式。org.apache.kafka.connect.storage.StringConverter:将数据转换为字符串格式或从字符串格式转换数据。org.apache.kafka.connect.converters.ByteArrayConverter:将数据转换为字节数组以及从字节数组转换数据。org.apache.kafka.connect.converters.DoubleConverter:将数据转换为 Double 格式或从 Double 格式转换数据。org.apache.kafka.connect.converters.FloatConverter:将数据转换为浮点格式以及从浮点格式转换数据。org.apache.kafka.connect.converters.IntegerConverter:将数据转换为整数格式,或将数据从整数格式转换为其他格式。org.apache.kafka.connect.converters.LongConverter:将数据转换为 Long 格式以及从 Long 格式转换数据。org.apache.kafka.connect.converters.ShortConverter:将数据转换为 Short 格式以及从 Short 格式转换数据。org.apache.kafka.connect.converters.BooleanConverter:将数据转换为布尔值格式以及从布尔值格式转换数据。io.confluent.connect.avro.AvroConverter:将数据转换为 Apache Avro 格式,并从该格式转换数据。
在此版本中,Kafka Connect 不支持使用架构注册表针对远程架构进行验证。
如需了解每个连接器的首选转换器,请参阅相应连接器的文档。
默认转换器配置
所有受支持连接器的默认键和值转换器均为 org.apache.kafka.connect.json.JsonConverter。
配置连接器时,您需要为 Kafka 消息的键和值指定相应的转换器。例如,如果您使用的是 JSON 数据,请使用 JsonConverter。如果您的数据采用字符串格式,请使用 StringConverter。
一些常见配置包括:
tasks.max:为相应连接器创建的任务数量上限。此参数用于控制连接器的并行性。增加任务数量可以提高吞吐量,但也会增加资源消耗(CPU 和内存)。最佳值取决于工作负载和分配给 Connect 集群工作器的资源,对于接收器连接器,还取决于 Kafka 主题分区的数量。value.converter:用于在将消息发送到 Cloud Storage 存储桶之前序列化消息的值的转换器。常见的转化器包括:org.apache.kafka.connect.json.JsonConverter:适用于 JSON 数据。将此转换器与纯 JSON(不含架构)搭配使用时,您通常需要设置value.converter.schemas.enable=false。org.apache.kafka.connect.converters.ByteArrayConverter:为了在两个系统之间保留消息的精确内容。org.apache.kafka.connect.storage.StringConverter:适用于纯文本字符串。
key.converter:用于序列化消息键的转换器。与value.converter相同的转换器选项适用。如果您的消息没有密钥,通常可以使用org.apache.kafka.connect.storage.StringConverter。value.converter.schemas.enable:对于接收器连接器,在使用org.apache.kafka.connect.json.JsonConverter时,如果将此属性设置为true,则指示 Kafka Connect 查找并使用嵌入在传入 Kafka 消息中的架构。如果设置为false(默认值),Kafka Connect 会将数据视为不含嵌入式架构的纯 JSON。
转换(可选)
转换功能可在数据流水线中实现数据操纵或丰富。 借助转换,您可以在将各个消息发送到 Managed Service for Apache Kafka(对于源连接器)或外部系统(对于接收器连接器)之前对其进行修改。您可以使用转换来遮盖敏感数据、添加时间戳或重命名字段。
谓词(可选)
谓词可用于根据特定条件过滤数据。谓词充当应用转换的过滤器,根据消息属性确定转换适用于哪些消息。
在 Google Cloud中管理 Kafka Connect
借助 Kafka Connect,您可以专注于部署连接器,而 Google Cloud会处理底层基础架构和运营复杂性。以下是 Google Cloud 自动执行的操作以及您可以配置的操作:
Kafka Connect 服务会自动执行以下操作:
预配 Kafka Connect 工作器:创建 Connect 集群时,Kafka Connect 服务会自动在 Kubernetes 中预配工作器集群。
网络:Kafka Connect 服务会配置网络,以实现工作器、Managed Service for Apache Kafka 代理和外部系统之间的通信。在某些情况下,您可能需要对现有网络设置进行一些更改。
可用区级弹性:Kafka Connect 服务将工作器分布在至少三个可用区中,确保在发生可用区级服务中断时,数据处理能够继续进行。
身份验证:Kafka Connect 服务还会配置与 Kafka 代理的身份验证,以确保连接安全。
发布和升级:Kafka Connect 服务可管理工作线程配置更改、版本升级和安全补丁,确保您的部署始终处于最新状态。
在 Kafka Connect 服务中,您可以执行以下配置:
容量和网络限制:定义资源限制和网络配置,以优化性能和成本。
监控和日志记录:访问连接器的日志和指标,以监控性能并排查问题。
连接器生命周期管理:根据需要暂停、恢复、重启或停止连接器,以管理数据流水线。
限制
Kafka Connect 服务仅支持将 Managed Service for Apache Kafka 集群作为主 Kafka 集群。主集群是指 Kafka Connect 集群将元数据写入到的集群。
该服务不支持将自定义连接器插件上传到 Kafka Connect 集群。