创建 Google Cloud Managed Service for Apache Kafka 主题

在 Managed Service for Apache Kafka 中,消息按主题进行整理。 主题由分区组成。分区是 Kafka 集群中单个代理拥有的有序且不可变的记录序列。 您必须先创建主题,然后才能发布或使用消息。

如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、 客户端库、Managed Kafka API 或开源 Apache Kafka API。

准备工作

您必须先创建集群,然后才能创建主题。 请确保您已设置以下内容:

创建主题所需的角色和权限

如要获得创建主题所需的权限,请让您的管理员为您授予项目的Managed Kafka Topic Editor (roles/managedkafka.topicEditor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需创建主题,您需要以下权限:

  • 创建主题: managedkafka.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

Managed Kafka Topic Editor 角色还包含 Managed Kafka Viewer 角色。 如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

Managed Service for Apache Kafka 主题的属性

创建或更新 Managed Service for Apache Kafka 主题时,您必须指定以下属性。

主题名称

您正在创建的 Managed Service for Apache Kafka 主题的名称。 如需了解如何命名主题的指南, 请参阅 Managed Service for Apache Kafka 资源命名指南。 主题的名称是不可变的。

分区数

主题中的分区数量。您可以修改主题以增加主题的分区数,但不能减少。 增加使用键的主题的分区数可能会改变消息的分配方式。

复制因子

每个分区的副本数。如果您未指定值, 系统会使用集群的默认复制因子。

如果代理发生故障,较高的复制因子可以提高数据一致性,因为数据会复制到多个代理。对于生产环境,建议使用 3 或更高的复制因子。较高的副本数会增加主题的本地存储空间和数据传输费用。 不过,它们不会增加永久性存储空间费用。复制因子不能超过可用代理的数量。

其他参数

您还可以设置其他 Apache Kafka 主题级配置参数。 这些参数以 key=value 对的形式指定,用于替换集群默认值。

与主题相关的配置具有服务器默认值和可选的按主题替换项。格式为 KEY=VALUE 对的逗号分隔列表, 其中 KEY 是 Kafka 主题配置属性的名称,VALUE 是所需的设置。这些键值对可帮助您替换集群 默认值。例如,flush.ms=10compression.type=producer

如需获取所有支持的主题级配置列表,请参阅 Apache Kafka 文档中的主题级配置

创建主题

创建主题之前,请查看主题属性

控制台

  1. 在 Google Cloud 控制台中,前往 集群 页面。

    转到“集群”

  2. 点击要为其创建主题的集群。

    系统会打开集群详情 页面。

  3. 在集群详情页面中,点击创建主题

    系统会打开创建 Kafka 主题 页面。

  4. 对于主题名称,输入一个字符串。

  5. 对于分区数,输入所需的分区数,或保留 默认值。

  6. 对于复制因子,输入所需的复制因子,或保留 默认值。

  7. (可选)如需更改任何主题配置,请在配置 字段中以逗号分隔的键值对的形式添加这些配置。

  8. 点击创建

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境。该会话可能需要几秒钟来完成初始化。

  2. 运行 gcloud managed-kafka topics create 命令:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    替换以下内容:

    • TOPIC_ID:主题的名称。
    • CLUSTER:您要在其中创建主题的集群的名称。
    • LOCATION:集群的区域。
    • PARTITIONS:主题的分区数。
    • REPLICATION_FACTOR:主题的复制因子。
    • CONFIGS:主题级可选参数。 以逗号分隔的键值对的形式指定。例如,compression.type=producer

Kafka CLI

运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。请按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

按如下方式运行 kafka-topics.sh 命令:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的 引导地址

  • TOPIC_ID:主题的名称。

  • PARTITIONS:主题的分区数。

  • REPLICATION_FACTOR:主题的复制因子。

REST

在使用任何请求数据之前, 请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目 ID
  • LOCATION:集群的位置
  • CLUSTER_ID:集群的 ID
  • TOPIC_ID:主题的 ID
  • PARTITION_COUNT:主题的分区数
  • REPLICATION_FACTOR:每个分区的副本数

HTTP 方法和网址:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

请求 JSON 正文:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

您可以使用 Terraform 资源创建 主题

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

如需了解如何应用或移除 Terraform 配置,请参阅 基本 Terraform 命令

Go

在试用此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Go API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。如需了解详情, 请参阅 为本地开发环境设置 ADC

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

在试用此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Java API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置 ADC

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

在试用此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Python API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情, 请参阅为本地开发环境设置 ADC

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

接下来怎么做?