更新 Google Cloud Managed Service for Apache Kafka 叢集

您可以編輯 Managed Service for Apache Kafka 叢集,更新叢集大小 (vCPU 數量和記憶體)、已連線的子網路清單、自動重新平衡設定和 mTLS 設定等屬性。

如要編輯叢集,可以使用 Google Cloud 控制台、Google Cloud CLI、用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 更新叢集。

更新特定屬性 (例如 vCPU 數量和記憶體) 時,服務可能需要重新啟動叢集。叢集會一次重新啟動一個代理程式。在此期間,對個別代理商的要求可能會失敗,但這些失敗是暫時性的。常用的用戶端程式庫會自動處理這些錯誤。

必要角色和權限

如要取得更新叢集所需的權限,請要求管理員授予您專案的代管 Kafka 叢集編輯者 (roles/managedkafka.clusterEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備更新叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要更新叢集,您必須具備下列權限:

  • 編輯叢集: managedkafka.clusters.update

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

調整叢集大小

更新叢集的 vCPU 數量或記憶體時,請遵守下列規則:

  • 叢集的整體 vCPU 與記憶體比例必須介於 1:1 和 1:8 之間。

  • 如果縮減規模,每個現有代理程式必須至少有 1 個 vCPU 和 1 GiB 的記憶體。代理程式數量不會減少。

  • 如果擴充規模,且變更會導致新增中介服務,與更新前的平均值相比,每個中介服務的平均 vCPU 和記憶體不得減少超過 10%。

    舉例來說,如果您嘗試將叢集從 45 個 vCPU (3 個代理程式) 擴充至 48 個 vCPU (4 個代理程式),作業就會失敗。這是因為每個代理程式的平均 vCPU 數量從 15 個減少至 12 個,減少幅度為 20%,超過 10% 的限制。

詳情請參閱「更新叢集大小」。

編輯叢集

如要編輯叢集,請按照下列步驟操作:

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 在叢集清單中,按一下要編輯屬性的叢集。

    系統隨即會顯示叢集詳細資料頁面。

  3. 在叢集詳細資料頁面中,按一下「編輯」

  4. 視需要編輯屬性。您可以在控制台編輯叢集的下列屬性:

    • 記憶體
    • vCPU
    • 子網路
    • 重新平衡設定
    • mTLS 設定
    • 標籤
  5. 按一下 [儲存]

gcloud

  1. 在 Google Cloud 控制台中啟用 Cloud Shell。

    啟用 Cloud Shell

    Google Cloud 主控台底部會開啟一個 Cloud Shell 工作階段,並顯示指令列提示。Cloud Shell 是已安裝 Google Cloud CLI 的殼層環境,並已針對您目前的專案設定好相關值。工作階段可能要幾秒鐘的時間才能初始化。

  2. 執行 gcloud managed-kafka clusters update 指令:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    更改下列內容:

    • CLUSTER_ID:叢集的 ID 或名稱。您無法更新這個值。
    • LOCATION:叢集位置。您無法更新這個值。
    • CPU:叢集的虛擬 CPU 數量。
    • MEMORY:叢集的記憶體量。 請使用「MB」、「MiB」、「GB」、「GiB」、「TB」或「TiB」單位。例如「10GiB」。
    • SUBNETS:要連線的子網路清單。 如有多個子網路值,請使用半形逗號分隔。
    • auto-rebalance:在叢集中的 CPU 數量變更時,啟用代理程式間的主題分區自動重新平衡功能。這項功能預設為啟用。
    • LABELS:要與叢集建立關聯的標籤。

如果在指令中使用 --async 旗標,系統會傳送更新要求並立即傳回回應,不會等待作業完成。使用 --async 標記時,叢集更新會在背景執行,您可繼續進行其他工作。如果您未使用 --async 旗標,系統會等待作業完成,然後再傳回回應。您必須等到叢集完全更新,才能繼續執行其他工作。

REST

使用任何要求資料之前,請先修改下列項目的值:

  • PROJECT_ID:您的 Google Cloud 專案 ID
  • LOCATION:叢集位置
  • CLUSTER_ID:叢集 ID
  • UPDATE_MASK:要更新的欄位,以半形逗號分隔的完整名稱清單表示。示例: capacityConfig.vcpuCount,capacityConfig.memoryBytes
  • CPU_COUNT:叢集的 vCPU 數量
  • MEMORY:叢集的記憶體量,以位元組為單位
  • SUBNET_ID:要連線的子網路子網路 ID

HTTP 方法和網址:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

JSON 要求主體:

{
  "capacityConfig": {
    "vcpuCount": CPU_COUNT,
    "memoryBytes": MEMORY
  },
  "gcpConfig": {
    "accessConfig": {
      "networkConfigs": [
        {
          "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
        }
      ]
    }
  }
}

請展開以下其中一個選項,以傳送要求:

您應該會收到如下的 JSON 回覆:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

在要求主體中,只加入要更新的欄位,如 UPDATE_MASK 查詢參數所指定。如要新增子網路,請在 networkConfigs 中附加新項目。

Go

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// memoryBytes := 4221225472
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	capacityConfig := &managedkafkapb.CapacityConfig{
		MemoryBytes: memory,
	}
	cluster := &managedkafkapb.Cluster{
		Name:           clusterPath,
		CapacityConfig: capacityConfig,
	}
	paths := []string{"capacity_config.memory_bytes"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateClusterRequest{
		UpdateMask: updateMask,
		Cluster:    cluster,
	}
	op, err := client.UpdateCluster(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateCluster got err: %w", err)
	}
	resp, err := op.Wait(ctx)
	if err != nil {
		return fmt.Errorf("op.Wait got err: %w", err)
	}
	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
	return nil
}

Java

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。


import com.google.api.gax.longrunning.OperationFuture;
import com.google.api.gax.longrunning.OperationSnapshot;
import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.TimedRetryAlgorithm;
import com.google.cloud.managedkafka.v1.CapacityConfig;
import com.google.cloud.managedkafka.v1.Cluster;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
import com.google.cloud.managedkafka.v1.OperationMetadata;
import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
import com.google.protobuf.FieldMask;
import java.time.Duration;
import java.util.concurrent.ExecutionException;

public class UpdateCluster {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    long memoryBytes = 25769803776L; // 24 GiB
    updateCluster(projectId, region, clusterId, memoryBytes);
  }

  public static void updateCluster(
      String projectId, String region, String clusterId, long memoryBytes) throws Exception {
    CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
    Cluster cluster =
        Cluster.newBuilder()
            .setName(ClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();

    // Create the settings to configure the timeout for polling operations
    ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
    TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
        RetrySettings.newBuilder()
            .setTotalTimeoutDuration(Duration.ofHours(1L))
            .build());
    settingsBuilder.updateClusterOperationSettings()
        .setPollingAlgorithm(timedRetryAlgorithm);

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
        settingsBuilder.build())) {
      UpdateClusterRequest request =
          UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
      OperationFuture<Cluster, OperationMetadata> future =
          managedKafkaClient.updateClusterOperationCallable().futureCall(request);

      // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
      OperationSnapshot operation = future.getInitialFuture().get();
      System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
          operation.getName(),
          operation.isDone(),
          future.getMetadata().get().toString());

      Cluster response = future.get();
      System.out.printf("Updated cluster: %s\n", response.getName());
    } catch (ExecutionException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考文件

如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

from google.api_core.exceptions import GoogleAPICallError
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# memory_bytes = 4295000000

client = managedkafka_v1.ManagedKafkaClient()

cluster = managedkafka_v1.Cluster()
cluster.name = client.cluster_path(project_id, region, cluster_id)
cluster.capacity_config.memory_bytes = memory_bytes
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("capacity_config.memory_bytes")

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
request = managedkafka_v1.UpdateClusterRequest(
    update_mask=update_mask,
    cluster=cluster,
)

try:
    operation = client.update_cluster(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Updated cluster:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e.message}")

限制

建立 Managed Service for Apache Kafka 叢集後,您就無法更新下列屬性:

  • 叢集名稱
  • 叢集位置
  • 加密類型

雖然無法變更加密類型,但可以輪替加密金鑰

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。