更新 Google Cloud Managed Service for Apache Kafka 主题

创建主题后,您可以修改主题配置,以更新以下属性:分区数和主题配置(这些配置不会默认设置为集群级已设置的属性)。您只能增加分区数,不能减少。

如需更新单个主题,您可以使用 Google Cloud 控制台、Google Cloud CLI、 客户端库、Managed Kafka API 或开源 Apache Kafka API。

修改主题所需的角色和权限

如需获得修改主题所需的权限,请让您的管理员为您授予项目的 Managed Kafka Topic Editor(roles/managedkafka.topicEditor) IAM 角色。如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色可提供 修改主题所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需修改主题,您需要以下权限:

  • 更新主题: managedkafka.topics.update

您也可以使用自定义角色或其他预定义角色来获取这些权限。

如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

修改主题

如需修改主题,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往 集群 页面。

    转到“集群”

    您在项目中创建的集群会列出。

  2. 点击您要修改的主题所属的集群。

    集群详情 页面随即打开。在集群详情页面的资源 标签页中,主题会列出。

  3. 点击您要修改的主题。

    主题详情 页面随即打开。

  4. 如需进行修改,请点击修改

  5. 更改后,点击保存

gcloud

  1. 在 Google Cloud 控制台中,激活 Cloud Shell。

    激活 Cloud Shell

    Cloud Shell 会话随即会在控制台的底部启动,并显示命令行提示符。 Google Cloud Cloud Shell 是一个已安装 Google Cloud CLI 且已为当前项目设置值的 Shell 环境 。该会话可能需要几秒钟来完成初始化。

  2. 运行 gcloud managed-kafka topics update 命令:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    此命令会修改指定 Managed Service for Apache Kafka 集群中现有主题的配置。您可以使用此命令来增加分区数并更新主题级配置设置。

    替换以下内容:

    • TOPIC_ID:主题的 ID。
    • CLUSTER_ID:包含 该主题的集群的 ID。
    • LOCATION_ID:集群的位置。
    • PARTITIONS:(可选)主题的更新后的 分区数。您只能增加分区数,不能减少。
    • CONFIGS:(可选)要更新的配置设置 列表。指定为键值对的逗号分隔列表。例如,retention.ms=3600000,retention.bytes=10000000

REST

在使用任何请求数据之前, 请先进行以下替换:

  • PROJECT_ID:您的 Google Cloud 项目 ID
  • LOCATION:集群的位置
  • CLUSTER_ID:集群的 ID
  • TOPIC_ID:主题的 ID
  • UPDATE_MASK:要更新的字段,以逗号分隔列表的形式表示完全限定名称。示例: partitionCount
  • PARTITION_COUNT:主题的更新后的分区数

HTTP 方法和网址:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

请求 JSON 正文:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

在试用此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Go API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。如需了解详情, 请参阅 为本地开发环境设置 ADC

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

在试用此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Java API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情,请参阅 为本地开发环境设置 ADC

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

在试用此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情, 请参阅 Managed Service for Apache Kafka Python API 参考文档

如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。如需了解详情, 请参阅为本地开发环境设置 ADC

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

配置消息保留

Kafka 会将消息存储在日志段文件中。 默认情况下,Kafka 会在保留期限过后或分区超出数据大小阈值时删除段文件。您可以通过启用 日志压缩来更改此行为。如果启用了日志压缩,Kafka 将仅保留每个键的最新值。

Google Cloud Managed Service for Apache Kafka 使用 分层存储,这意味着已完成的 日志段会远程存储,而不是本地存储。如需详细了解 分层存储,请参阅 Apache Kafka 文档中的 分层存储。

设置保留值

如果未启用日志压缩,则以下设置将控制 Kafka 如何存储日志段文件:

  • retention.ms:保存段文件的最长时间,以毫秒为单位。
  • retention.bytes:每个分区要存储的最大字节数。如果分区中的数据超出此值,Kafka 将舍弃较旧的段文件。

如需更新这些设置,请使用 gcloud CLI 或 Kafka CLI:

gcloud

如需设置消息保留,请运行 gcloud managed-kafka topics update 命令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

替换以下内容:

  • TOPIC_ID:主题的 ID。
  • CLUSTER_ID:包含该 主题的集群的 ID。
  • LOCATION_ID:集群的位置。
  • RETENTION_PERIOD:存储段文件的最长时间,以毫秒为单位。
  • MAX_BYTES:每个分区要存储的最大字节数。

Kafka CLI

在运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。请按照 使用 Kafka 命令行工具生成和使用消息中的说明进行操作。

运行 kafka-configs.sh 命令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的 引导地址
  • TOPIC_ID:主题的 ID。
  • RETENTION_PERIOD:存储段文件的最长时间,以毫秒为单位。
  • MAX_BYTES:每个分区要存储的最大字节数。

启用日志压缩

如果启用了日志压缩,Kafka 将仅存储每个键的最新消息。日志压缩默认处于停用状态。如需为 主题启用日志压缩,请将 cleanup.policy 配置设置为 "compact",如下所示:

gcloud

运行 gcloud managed-kafka topics update 命令。

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

替换以下内容:

  • TOPIC_ID:主题的 ID。
  • CLUSTER_ID:包含该 主题的集群的 ID。
  • LOCATION_ID:集群的位置。

Kafka CLI

在运行此命令之前,请在 Compute Engine 虚拟机上安装 Kafka 命令行工具。虚拟机必须能够访问连接到 Managed Service for Apache Kafka 集群的子网。请按照 使用 Kafka 命令行工具生成和使用消息中的说明进行操作。

运行 kafka-configs.sh 命令:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

替换以下内容:

  • BOOTSTRAP_ADDRESS:Managed Service for Apache Kafka 集群的 引导地址
  • TOPIC_ID:主题的 ID。

限制

  • 您无法替换远程存储的主题配置,例如 remote.storage.enable

  • 您无法替换日志段文件的主题配置,例如 segment.bytes

  • 为主题启用日志压缩会隐式停用该主题的分层存储。该主题的所有日志文件都存储在本地。

接下来怎么做?