借助 Cloud Storage 接收器连接器,您可以将数据从 Kafka 主题流式传输到 Cloud Storage 存储分区。这有助于以经济实惠且可伸缩的方式存储和处理大量数据。
准备工作
在创建 Cloud Storage Sink 连接器之前,请确保您具备以下条件:
为您的 Connect 集群创建 Managed Service for Apache Kafka 集群。这是与 Connect 集群关联的 Kafka 主集群。这也是构成连接器流水线一端的源集群。
创建 Connect 集群以托管 Cloud Storage 接收器连接器。
创建 Cloud Storage 存储桶以存储从 Kafka 流式传输的数据。
在源集群中创建并配置 Kafka 主题。数据会从该 Kafka 主题移至目标 Cloud Storage 存储桶。
所需的角色和权限
如需获得创建 Cloud Storage Sink 连接器所需的权限,请让您的管理员为您授予项目的 Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色包含创建 Cloud Storage Sink 连接器所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需创建 Cloud Storage Sink 连接器,您需要具备以下权限:
-
在父级 Connect 集群上授予创建连接器的权限:
managedkafka.connectors.create
如需详细了解 Managed Kafka Connector Editor 角色,请参阅 Managed Service for Apache Kafka 预定义角色。
如果您的 Managed Service for Apache Kafka 集群与 Connect 集群位于同一项目中,则无需进一步授予权限。如果 Connect 集群位于其他项目中,请参阅在其他项目中创建 Connect 集群。
授予写入 Cloud Storage 存储桶的权限
Connect 集群服务账号(格式为 service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com)需要以下 Cloud Storage 权限:
storage.objects.createstorage.objects.delete
为此,请在包含 Cloud Storage 存储桶的项目中,向 Connect 集群服务账号授予 Storage Object User (roles/storage.objectUser) 角色。
Cloud Storage 接收器连接器的工作原理
Cloud Storage 接收器连接器从一个或多个 Kafka 主题拉取数据,并将这些数据写入单个 Cloud Storage 存储桶中的对象。
下面详细介绍了 Cloud Storage Sink 连接器如何复制数据:
连接器会使用源集群内一个或多个 Kafka 主题中的消息。
连接器会将数据写入您在连接器配置中指定的目标 Cloud Storage 存储桶。
连接器在将数据写入 Cloud Storage 存储桶时,会参考连接器配置中的特定属性来设置数据格式。默认情况下,输出文件采用 CSV 格式。您可以配置
format.output.type属性来指定不同的输出格式,例如 JSON。连接器还会命名写入 Cloud Storage 存储桶的文件。您可以使用
file.name.prefix和file.name.template属性自定义文件名。例如,您可以在文件名中添加 Kafka 主题名称或消息键。Kafka 记录包含三个组成部分:标头、键、值。
您可以通过设置
format.output.fields来在输出文件中包含标题。 例如format.output.fields=value,headers。您可以通过设置
format.output.fields来在输出文件中包含键,以包含key。例如format.output.fields=key,value,headers。您还可以通过在
file.name.template属性中添加key来按键对记录进行分组。
您可以默认在输出文件中包含值,因为
format.output.fields默认设置为value。连接器会将转换并格式化后的数据写入指定的 Cloud Storage 存储桶。
如果您使用
file.compression.type属性配置了文件压缩,连接器会压缩存储在 Cloud Storage 存储桶中的文件。转换器配置受
format.output.type属性限制。例如,当
format.output.type设置为csv时,键转换器必须为org.apache.kafka.connect.converters.ByteArrayConverter或org.apache.kafka.connect.storage.StringConverter,而值转换器必须为org.apache.kafka.connect.converters.ByteArrayConverter。当
format.output.type设置为json时,即使value.converter.schemas.enable属性为 true,值和键架构也不会与输出文件中的数据一起写入。
tasks.max属性用于控制连接器的并行级别。增加tasks.max可以提高吞吐量,但实际并行性受 Kafka 主题中分区数量的限制。
Cloud Storage 接收器连接器的属性
创建 Cloud Storage 接收器连接器时,请指定以下属性。
连接器名称
连接器的名称或 ID。如需了解有关如何命名资源的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。 该名称不可变。
连接器插件类型
在Google Cloud 控制台中,选择 Cloud Storage Sink 作为连接器插件类型。如果您不使用界面来配置连接器,还必须指定连接器类。
主题
连接器从中消费消息的 Kafka 主题。
您可以指定一个或多个主题,也可以使用正则表达式来匹配多个主题。例如,topic.* 可匹配以“topic”开头的所有主题。这些主题必须存在于与您的 Connect 集群关联的 Managed Service for Apache Kafka 集群中。
Cloud Storage 存储桶
选择或创建用于存储数据的 Cloud Storage 存储桶。
配置
您可以在此部分中为 Cloud Storage Sink 连接器指定其他特定于连接器的配置属性。
由于 Kafka 主题中的数据可以采用各种格式(例如 Avro、JSON 或原始字节),因此配置的关键部分是指定转换器。转换器会将 Kafka 主题中使用的格式的数据转换为 Kafka Connect 的标准化内部格式。然后,Cloud Storage Sink 连接器会获取此内部数据,并将其转换为 Cloud Storage 存储桶所需的格式,然后再写入数据。
如需详细了解转换器在 Kafka Connect 中的作用、支持的转换器类型和常见配置选项,请参阅转换器。
以下是一些特定于 Cloud Storage Sink 连接器的配置:
gcs.credentials.default:是否自动从执行环境中发现 Google Cloud 凭据。必须设置为true。gcs.bucket.name:指定写入数据的 Cloud Storage 存储桶的名称。必须设置。file.compression.type:为存储在 Cloud Storage 存储桶中的文件设置压缩类型。例如gzip、snappy、zstd和none。默认值为none。file.name.prefix:要添加到 Cloud Storage 存储桶中存储的每个文件名称中的前缀。默认值为空。format.output.type:用于将数据写入 Cloud Storage 输出文件的数据格式类型。支持的值包括:csv、json、jsonl和parquet。默认值为csv。
如需查看此连接器特有的可用配置属性的列表,请参阅 Cloud Storage Sink 连接器配置。
创建 Cloud Storage 接收器连接器
在创建连接器之前,请查看 Cloud Storage 接收器连接器的属性的相关文档。
控制台
在 Google Cloud 控制台中,前往连接集群页面。
点击要为其创建连接器的 Connect 集群。
系统会显示连接集群详情页面。
点击创建连接器。
系统会显示创建 Kafka 连接器页面。
对于连接器名称,请输入一个字符串。
有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。
对于连接器插件,选择 Cloud Storage Sink。
指定可从中流式传输数据的主题。
选择用于存储数据的 Storage Bucket。
(可选)在配置部分中配置其他设置。
选择任务重启政策。如需了解详情,请参阅任务重启政策。
点击创建。
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
运行
gcloud managed-kafka connectors create命令:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILE替换以下内容:
CONNECTOR_ID:连接器的 ID 或名称。 有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。
LOCATION:您创建连接器的位置。此位置必须与您创建 Connect 集群的位置相同。
CONNECT_CLUSTER_ID:创建连接器的 Connect 集群的 ID。
CONFIG_FILE:BigQuery Sink 连接器的 YAML 配置文件路径。
以下是 Cloud Storage Sink 连接器的配置文件示例:
connector.class: "io.aiven.kafka.connect.gcs.GcsSinkConnector" tasks.max: "1" topics: "GMK_TOPIC_ID" gcs.bucket.name: "GCS_BUCKET_NAME" gcs.credentials.default: "true" format.output.type: "json" name: "GCS_SINK_CONNECTOR_ID" value.converter: "org.apache.kafka.connect.json.JsonConverter" value.converter.schemas.enable: "false" key.converter: "org.apache.kafka.connect.storage.StringConverter"替换以下内容:
GMK_TOPIC_ID:Managed Service for Apache Kafka 主题的 ID,数据从该主题流向 Cloud Storage 接收器连接器。
GCS_BUCKET_NAME:作为流水线接收器的 Cloud Storage 存储桶的名称。
GCS_SINK_CONNECTOR_ID:Cloud Storage Sink 连接器的 ID 或名称。有关如何命名连接器的指南,请参阅 Managed Service for Apache Kafka 资源命名指南。连接器的名称不可变。
Terraform
您可以使用 Terraform 资源创建连接器。
如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令。
Go
在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC。
Java
在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC。
Python
在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC。
创建连接器后,您可以修改、删除、暂停、停止或重启连接器。