在 Managed Service for Apache Kafka 中,消息按主题进行整理。 主题由分区组成。分区是 Kafka 集群中单个代理拥有的有序且不可变的记录序列。 您必须先创建主题,然后才能发布或使用消息。
如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、 客户端库、Managed Kafka API 或开源 Apache Kafka API。
准备工作
您必须先创建集群,然后才能创建主题。 请确保您已设置以下内容:
创建主题所需的角色和权限
如要获得创建主题所需的权限,请让您的管理员为您授予项目的Managed Kafka Topic Editor (roles/managedkafka.topicEditor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
此预定义角色可提供创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:
所需权限
如需创建主题,您需要以下权限:
-
创建主题:
managedkafka.topics.create
Managed Kafka Topic Editor 角色还包含 Managed Kafka Viewer 角色。 如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色。
Managed Service for Apache Kafka 主题的属性
创建或更新 Managed Service for Apache Kafka 主题时,您必须指定以下属性。
主题名称
您正在创建的 Managed Service for Apache Kafka 主题的名称。 如需了解如何命名主题的指南, 请参阅 Managed Service for Apache Kafka 资源命名指南。 主题的名称是不可变的。
分区数
主题中的分区数量。您可以修改主题以增加主题的分区数,但不能减少。 增加使用键的主题的分区数可能会改变消息的分配方式。
复制因子
每个分区的副本数。如果您未指定值, 系统会使用集群的默认复制因子。
如果代理发生故障,较高的复制因子可以提高数据一致性,因为数据会复制到多个代理。对于生产环境,建议使用 3 或更高的复制因子。较高的副本数会增加主题的本地存储空间和数据传输费用。 不过,它们不会增加永久性存储空间费用。复制因子不能超过可用代理的数量。
其他参数
您还可以设置其他 Apache Kafka 主题级配置参数。
这些参数以 key=value 对的形式指定,用于替换集群默认值。
与主题相关的配置具有服务器默认值和可选的按主题替换项。格式为 KEY=VALUE 对的逗号分隔列表,
其中 KEY 是 Kafka 主题配置属性的名称,VALUE
是所需的设置。这些键值对可帮助您替换集群
默认值。例如,flush.ms=10 和
compression.type=producer。
如需获取所有支持的主题级配置列表,请参阅 Apache Kafka 文档中的主题级配置 。
创建主题
创建主题之前,请查看主题属性。
控制台
在 Google Cloud 控制台中,前往 集群 页面。
点击要为其创建主题的集群。
系统会打开集群详情 页面。
在集群详情页面中,点击创建主题 。
系统会打开创建 Kafka 主题 页面。
对于主题名称,输入一个字符串。
对于分区数,输入所需的分区数,或保留 默认值。
对于复制因子,输入所需的复制因子,或保留 默认值。
(可选)如需更改任何主题配置,请在配置 字段中以逗号分隔的键值对的形式添加这些配置。
点击创建 。
gcloud
-
在 Google Cloud 控制台中,激活 Cloud Shell。
Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟来完成初始化。
运行
gcloud managed-kafka topics create命令:gcloud managed-kafka topics create TOPIC_ID \ --cluster=CLUSTER_ID --location=LOCATION_ID \ --partitions=PARTITIONS \ --replication-factor=REPLICATION_FACTOR \ --configs=CONFIGS替换以下内容:
- TOPIC_ID:主题的名称。
- CLUSTER:您要在其中创建主题的集群的名称。
- LOCATION:集群的区域。
- PARTITIONS:主题的分区数。
- REPLICATION_FACTOR:主题的复制因子。
- CONFIGS:主题级可选参数。
以逗号分隔的键值对的形式指定。例如,
compression.type=producer。
Kafka CLI
运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。请按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。
按如下方式运行 kafka-topics.sh 命令:
kafka-topics.sh --create --if-not-exists \
--bootstrap-server=BOOTSTRAP_ADDRESS \
--command-config client.properties \
--topic TOPIC_ID \
--partitions PARTITIONS \
--replication-factor REPLICATION_FACTOR
替换以下内容:
BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的 引导地址 。
TOPIC_ID:主题的名称。
PARTITIONS:主题的分区数。
REPLICATION_FACTOR:主题的复制因子。
REST
在使用任何请求数据之前, 请先进行以下替换:
-
PROJECT_ID:您的 Google Cloud 项目 ID -
LOCATION:集群的位置 -
CLUSTER_ID:集群的 ID -
TOPIC_ID:主题的 ID -
PARTITION_COUNT:主题的分区数 -
REPLICATION_FACTOR:每个分区的副本数
HTTP 方法和网址:
POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID
请求 JSON 正文:
{
"name": "TOPIC_ID",
"partitionCount": PARTITION_COUNT,
"replicationFactor": REPLICATION_FACTOR
}
如需发送您的请求,请展开以下选项之一:
您应该收到类似以下内容的 JSON 响应:
{
"name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
"partitionCount": PARTITION_COUNT,
"replicationFactor": REPLICATION_FACTOR
}
Terraform
您可以使用 Terraform 资源创建 主题。
如需了解如何应用或移除 Terraform 配置,请参阅 基本 Terraform 命令。
Go
在试用此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Go API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。如需了解详情, 请参阅 为本地开发环境设置 ADC。
Java
在试用此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Java API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置 ADC。
Python
在试用此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Python API 参考文档。
如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情, 请参阅为本地开发环境设置 ADC。