更新 Google Cloud Managed Service for Apache Kafka 主题

创建主题后,您可以修改主题配置以更新以下属性:分区数和未默认设置为集群级已设置属性的主题配置。您只能增加分区数,而不能减少。

如需更新单个主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库、Managed Kafka API 或开源 Apache Kafka API。

修改主题所需的角色和权限

如需获得修改主题所需的权限,请让您的管理员为您授予项目的 Managed Kafka Topic Editor(roles/managedkafka.topicEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含修改主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需修改主题,您需要拥有以下权限:

  • 更新主题: managedkafka.topics.update

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

修改主题

如需修改主题,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到“集群”

    系统会列出您在项目中创建的集群。

  2. 点击要修改的主题所属的集群。

    系统会打开集群详情页面。在集群详情页面中,资源标签页会列出主题。

  3. 点击要修改的主题。

    主题详细信息 页面打开。

  4. 如需进行修改,请点击修改

  5. 更改完成后,点击保存

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka topics update 命令:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    此命令用于修改指定 Managed Service for Apache Kafka 集群中现有主题的配置。您可以使用此命令来增加分区数量并更新主题级配置设置。

    替换以下内容:

    • TOPIC_ID:主题的 ID。
    • CLUSTER_ID:包含相应主题的集群的 ID。
    • LOCATION_ID:集群的位置。
    • PARTITIONS:可选:主题的更新分区数。您只能增加分区数量,不能减少。
    • CONFIGS:可选:要更新的配置设置列表。以键值对的英文逗号分隔列表形式指定。例如 retention.ms=3600000,retention.bytes=10000000
  3. REST

    在使用任何请求数据之前,请先进行以下替换:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • LOCATION:集群的位置
    • CLUSTER_ID:集群的 ID
    • TOPIC_ID:主题的 ID
    • UPDATE_MASK:要更新的字段,以完全限定名称的英文逗号分隔列表的形式表示。示例:partitionCount
    • PARTITION_COUNT:主题的更新后分区数

    HTTP 方法和网址:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    请求 JSON 正文:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    如需发送您的请求,请展开以下选项之一:

    您应该收到类似以下内容的 JSON 响应:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

配置消息保留

Kafka 将消息存储在日志段文件中。默认情况下,Kafka 会在保留期限过后或分区超出数据大小阈值时删除段文件。您可以通过启用日志压缩来更改此行为。如果启用了日志压缩,则 Kafka 只会保留每个键的最新值。

Google Cloud Managed Service for Apache Kafka 使用分层存储,这意味着已完成的日志段会远程存储,而不是本地存储。如需详细了解分层存储,请参阅 Apache Kafka 文档中的分层存储

设置保留值

如果未启用日志压缩,则以下设置会控制 Kafka 存储日志段文件的方式:

  • retention.ms:保存分段文件的最长时间(以毫秒为单位)。
  • retention.bytes:每个分区要存储的最大字节数。如果分区中的数据超过此值,Kafka 会舍弃较旧的段文件。

如需更新这些设置,请使用 gcloud CLI 或 Kafka CLI:

gcloud

如需设置消息保留期限,请运行 gcloud managed-kafka topics update 命令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

替换以下内容:

  • TOPIC_ID:主题的 ID。
  • CLUSTER_ID:包含相应主题的集群的 ID。
  • LOCATION_ID:集群的位置。
  • RETENTION_PERIOD:存储分段文件的最长时间(以毫秒为单位)。
  • MAX_BYTES:每个分区要存储的字节数上限。

Kafka CLI

运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

运行 kafka-configs.sh 命令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的引导地址
  • TOPIC_ID:主题的 ID。
  • RETENTION_PERIOD:存储分段文件的最长时间(以毫秒为单位)。
  • MAX_BYTES:每个分区要存储的字节数上限。

启用日志压缩

如果启用了日志压缩,则 Kafka 只会存储每个键的最新消息。日志压缩默认处于停用状态。如需为主题启用日志压缩,请将 cleanup.policy 配置设置为 "compact",如下所示:

gcloud

运行 gcloud managed-kafka topics update 命令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

替换以下内容:

  • TOPIC_ID:主题的 ID。
  • CLUSTER_ID:包含相应主题的集群的 ID。
  • LOCATION_ID:集群的位置。

Kafka CLI

运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。按照 使用 Kafka 命令行工具生成和使用消息中的说明操作。

运行 kafka-configs.sh 命令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的引导地址
  • TOPIC_ID:主题的 ID。

限制

  • 您无法替换远程存储的主题配置,例如 remote.storage.enable

  • 您无法替换日志段文件的主题配置,例如 segment.bytes

  • 为主题启用日志压缩功能会隐式停用该主题的分层存储功能。主题的所有日志文件都存储在本地。

后续步骤