更新 Google Cloud Managed Service for Apache Kafka 叢集

您可以編輯 Google Cloud Managed Service for Apache Kafka 叢集,更新 vCPU 數量、記憶體、子網路、加密類型或標籤等屬性。您也可以設定在代理程式新增至叢集時,服務是否要重新平衡代理程式間的分區。這項服務會根據叢集的記憶體和 vCPU 設定,自動建立新的代理程式。

如要編輯叢集,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 更新叢集。

事前準備

更新 vCPU 數量或記憶體時,請遵守下列規則:

  • 叢集的整體 vCPU 與記憶體比例必須介於 1:1 和 1:8 之間。

  • 如果縮減規模,每個現有代理程式必須至少有 1 個 vCPU 和 1 GiB 的記憶體。經紀人數絕不會減少。

  • 如果擴充規模後新增了代理程式,與更新前相比,每個代理程式的平均 vCPU 和記憶體用量不得減少超過 10%。

    舉例來說,如果您嘗試將叢集從 45 個 vCPU (3 個代理程式) 擴充至 48 個 vCPU (4 個代理程式),作業就會失敗。這是因為每個代理程式的平均 vCPU 數量從 15 個減少至 12 個,減少幅度為 20%,超過 10% 的限制。

詳情請參閱「更新叢集大小」。

更新特定屬性 (例如 vCPU 數量和記憶體) 時,服務可能需要重新啟動叢集。叢集會一次重新啟動一個代理程式。 這會導致對個別代理商的要求暫時失敗,但這些失敗是暫時性的。常用的用戶端程式庫會自動處理這類錯誤。

您無法編輯叢集名稱、叢集位置或加密類型。

編輯叢集所需的角色和權限

如要取得更新叢集所需的權限,請要求管理員授予您專案的代管 Kafka 叢集編輯者 (roles/managedkafka.clusterEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備更新叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要更新叢集,您必須具備下列權限:

  • 編輯叢集: managedkafka.clusters.update

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

您無法透過 Managed Kafka 叢集編輯者角色,在 Managed Service for Apache Kafka 叢集上建立、刪除或修改主題和用戶群組。也不允許資料平面存取權,在叢集內發布或取用訊息。如要進一步瞭解這個角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

編輯叢集

如要編輯叢集,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 在叢集清單中,按一下要編輯屬性的叢集。

    系統隨即會顯示叢集詳細資料頁面。

  3. 在叢集詳細資料頁面中,按一下「編輯」

  4. 視需要編輯屬性。您可以在管理中心編輯叢集的下列屬性:

    • 記憶體
    • vCPU
    • 子網路
    • 重新平衡設定
    • mTLS 設定
    • 標籤
  5. 按一下 [儲存]

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 執行 gcloud managed-kafka clusters update 指令:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    更改下列內容:

    • CLUSTER_ID:叢集的 ID 或名稱。您無法更新這個值。
    • LOCATION:叢集位置。您無法更新這個值。
    • CPU:叢集的虛擬 CPU 數量。
    • MEMORY:叢集的記憶體量。 請使用「MB」、「MiB」、「GB」、「GiB」、「TB」或「TiB」單位。例如「10GiB」。
    • SUBNETS:要連線的子網路清單。 如有多個子網路值,請使用半形逗號分隔。
    • auto-rebalance:在叢集中的 CPU 數量變更時,啟用代理程式間的主題分區自動重新平衡功能。這項功能預設為啟用。
    • LABELS:要與叢集建立關聯的標籤。
  3. 如果在指令中使用 --async 標記,系統會傳送更新要求並立即傳回回應,不必等待作業完成。使用 --async 標記時,叢集更新會在背景執行,您可繼續進行其他工作。如果您未使用 --async 標記,系統會等待作業完成,然後傳回回應。您必須等到叢集完全更新,才能繼續執行其他工作。

    REST

    使用任何要求資料之前,請先修改下列項目的值:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • LOCATION:叢集位置
    • CLUSTER_ID:叢集 ID
    • UPDATE_MASK:要更新的欄位,以半形逗號分隔的完整名稱清單表示。示例: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT:叢集的 vCPU 數量
    • MEMORY:叢集的記憶體量,以位元組為單位
    • SUBNET_ID:要連線的子網路子網路 ID

    HTTP 方法和網址:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    JSON 要求主體:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    請展開以下其中一個選項,以傳送要求:

    您應該會收到如下的 JSON 回覆:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    在要求主體中,只加入要更新的欄位,如 UPDATE_MASK 查詢參數所指定。如要新增子網路,請在 networkConfigs 中附加新項目。

    Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。