创建 Google Cloud Managed Service for Apache Kafka 主题

在 Managed Service for Apache Kafka 中,消息按主题进行整理。 主题由分区组成。分区是 Kafka 集群中由单个代理拥有的有序且不可变的记录序列。您必须先创建一个主题,然后才能发布或处理消息。

如需创建主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。

准备工作

您必须先创建集群,然后才能创建主题。 确保您已设置以下各项:

创建主题所需的角色和权限

如需获得创建主题所需的权限,请让您的管理员为您授予项目的 Managed Kafka Topic Editor (roles/managedkafka.topicEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含创建主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

您需要具备以下权限才能创建主题:

  • 创建主题: managedkafka.topics.create

您也可以使用自定义角色或其他预定义角色来获取这些权限。

Managed Kafka Topic Editor 角色还包含 Managed Kafka Viewer 角色。如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

Managed Service for Apache Kafka 主题的属性

创建或更新 Managed Service for Apache Kafka 主题时,您必须指定以下属性。

主题名称

您正在创建的 Managed Service for Apache Kafka 主题的名称。 如需了解如何命名主题,请参阅 Managed Service for Apache Kafka 资源命名指南。 主题的名称不可更改。

分区数

主题中的分区数量。您可以修改主题以增加主题的分区数,但不能减少分区数。增加使用键的主题的分区数量可能会改变消息的分布方式。

复制因子

每个分区的副本数。如果您未指定值,则系统会使用集群的默认复制系数。

如果复制因子较高,则在出现代理故障时,由于数据已复制到多个代理,因此可以提高数据一致性。对于生产环境,建议使用 3 或更高的复制因子。较高的副本数会增加主题的本地存储和数据传输费用。 不过,它们不会增加持久性存储空间费用。复制因子不能超过可用代理的数量。

其他参数

您还可以设置其他 Apache Kafka 主题级配置参数。 这些变量以 key=value 对的形式指定,用于替换集群默认值。

与主题相关的配置具有服务器默认值和可选的每个主题的替换值。格式为以英文逗号分隔的 KEY=VALUE 对列表,其中 KEY 是 Kafka 主题配置属性的名称,VALUE 是所需的设置。这些键值对可帮助您替换集群默认值。例如 flush.ms=10compression.type=producer

如需获取所有支持的主题层级配置列表,请参阅 Apache Kafka 文档中的主题层级配置

创建主题

在创建主题之前,请先查看主题属性

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到“集群”

  2. 点击要为其创建主题的集群。

    系统会打开集群详情页面。

  3. 在集群详细信息页面中,点击创建主题

    系统会打开创建 Kafka 主题页面。

  4. 对于主题名称,输入一个字符串。

  5. 对于分区数,请输入所需的分区数或保留默认值。

  6. 对于复制因子,请输入所需的复制因子,或保留默认值。

  7. (可选)如需更改任何主题配置,请在配置字段中以英文逗号分隔的键值对形式添加这些配置。

  8. 点击创建

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka topics create 命令:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    替换以下内容:

    • TOPIC_ID:主题的名称。
    • CLUSTER:要在其中创建主题的集群的名称。
    • LOCATION:集群的区域。
    • PARTITIONS:相应主题的分区数。
    • REPLICATION_FACTOR:相应主题的复制因子。
    • CONFIGS:主题级可选参数。 以逗号分隔的键值对形式指定。例如 compression.type=producer
  3. Kafka CLI

    运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

    运行 kafka-topics.sh 命令,如下所示:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    替换以下内容:

    • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的引导地址

    • TOPIC_ID:主题的名称。

    • PARTITIONS:主题的分区数。

    • REPLICATION_FACTOR:主题的复制因子。

    REST

    在使用任何请求数据之前,请先进行以下替换:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • LOCATION:集群的位置
    • CLUSTER_ID:集群的 ID
    • TOPIC_ID:主题的 ID
    • PARTITION_COUNT:主题的分区数
    • REPLICATION_FACTOR:每个分区的副本数

    HTTP 方法和网址:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    请求 JSON 正文:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    如需发送您的请求,请展开以下选项之一:

    您应该收到类似以下内容的 JSON 响应:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    您可以使用 Terraform 资源创建主题

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    如需了解如何应用或移除 Terraform 配置,请参阅基本 Terraform 命令

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

后续步骤