更新 Google Cloud Managed Service for Apache Kafka 集群

您可以修改 Google Cloud Managed Service for Apache Kafka 集群,以更新 vCPU 数量、内存、子网、加密类型或标签等属性。您还可以配置在向集群添加代理时,服务是否在代理之间重新平衡分区。该服务会根据集群的内存和 vCPU 配置自动创建新代理。

如需修改集群,您可以使用 Google Cloud 控制台、Google Cloud CLI、客户端库或 Managed Kafka API。您无法使用开源 Apache Kafka API 更新集群。

准备工作

如果您更新 vCPU 数量或内存,则需遵守以下规则:

  • 集群的总体 vCPU 与内存比率必须始终保持在 1:1 到 1:8 之间。

  • 如果缩减规模,每个现有代理必须至少有 1 个 vCPU 和 1 GiB 内存。代理的数量永远不会减少。

  • 如果进行扩容,并且更改导致添加了新的代理,则每个代理的平均 vCPU 和内存与更新前的平均值相比,降幅不得超过 10%。

    例如,如果您尝试将集群从 45 个 vCPU(3 个代理)向上扩展到 48 个 vCPU(4 个代理),则操作会失败。这是因为每个代理的平均 vCPU 从 15 减少到 12,减少了 20%,超过了 10% 的限制。

如需了解详情,请参阅更新集群规模

更新某些属性(例如 vCPU 数量和内存)可能需要服务重启集群。集群会一次重启一个代理。 这会导致向各个代理发出的请求暂时失败,但这些失败是短暂的。常用的客户端库会自动处理此类错误。

您无法修改集群名称、集群位置或加密类型。

修改集群所需的角色和权限

如需获得更新集群所需的权限,请让您的管理员为您授予项目的 Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

此预定义角色包含更新集群所需的权限。如需查看所需的确切权限,请展开所需权限部分:

所需权限

如需更新集群,您需要具备以下权限:

  • 修改集群: managedkafka.clusters.update

您也可以使用自定义角色或其他预定义角色来获取这些权限。

借助 Managed Kafka Cluster Editor 角色,您无法在 Managed Service for Apache Kafka 集群上创建、删除或修改主题和消费者群组。它也不允许数据平面访问在集群内发布或使用消息。如需详细了解此角色,请参阅 Managed Service for Apache Kafka 预定义角色

修改集群

如需修改集群,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到“集群”

  2. 在集群列表中,点击要修改其属性的集群。

    系统会显示集群详情页面。

  3. 在集群详情页面中,点击修改

  4. 根据需要修改属性。您可以在控制台中修改集群的以下属性:

    • 内存
    • vCPU
    • 子网
    • 重新平衡配置
    • mTLS 配置
    • 标签
  5. 点击保存

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 运行 gcloud managed-kafka clusters update 命令:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    替换以下内容:

    • CLUSTER_ID:集群的 ID 或名称。您无法更新此值。
    • LOCATION:集群的位置。您无法更新此值。
    • CPU:集群的虚拟 CPU 数量。
    • MEMORY:集群的内存量。 使用“MB”“MiB”“GB”“GiB”“TB”或“TiB”单位。例如,“10GiB”。
    • SUBNETS:要连接到的子网列表。 使用英文逗号分隔多个子网值。
    • auto-rebalance:当集群中的 CPU 数量发生变化时,启用在代理之间自动重新平衡主题分区的功能。此功能默认处于启用状态。
    • LABELS:要与集群关联的标签。
  3. 如果您在命令中使用 --async 标志,系统会发送更新请求并立即返回响应,而无需等待操作完成。借助 --async 标志,您可以在集群更新在后台进行时继续执行其他任务。如果不使用 --async 标志,系统会等待操作完成,然后再返回响应。您必须等到集群完全更新后才能继续执行其他任务。

    REST

    在使用任何请求数据之前,请先进行以下替换:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • LOCATION:集群的位置
    • CLUSTER_ID:集群的 ID
    • UPDATE_MASK:要更新的字段,以完全限定名称的英文逗号分隔列表的形式表示。示例: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT:集群的 vCPU 数量
    • MEMORY:集群的内存量(以字节为单位)
    • SUBNET_ID:要连接到的子网的子网 ID

    HTTP 方法和网址:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    请求 JSON 正文:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    如需发送您的请求,请展开以下选项之一:

    您应该收到类似以下内容的 JSON 响应:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    在请求正文中,仅包含您要更新的字段,如 UPDATE_MASK 查询参数中所指定的那样。如需添加子网,请向 networkConfigs 追加一个新条目。

    Go

    在尝试此示例之前,请按照 安装客户端库中的 Go 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Go API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据(ADC)。 如需了解详情,请参阅为本地开发环境设置 ADC

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    在尝试此示例之前,请按照 安装客户端库中的 Java 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Java API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅 为本地开发环境设置 ADC

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在尝试此示例之前,请按照 安装客户端库中的 Python 设置说明进行操作。如需了解详情,请参阅 Managed Service for Apache Kafka Python API 参考文档

    如需向 Managed Service for Apache Kafka 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置 ADC

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

后续步骤

Apache Kafka® 是 Apache Software Foundation 或其关联公司在美国和/或其他国家/地区的注册商标。