更新連結叢集

您可以編輯 Connect 叢集,更新 vCPU 數量、記憶體、網路和標籤等屬性。

如要編輯 Connect 叢集,可以使用 Google Cloud 控制台、gcloud CLI、用戶端程式庫或 Managed Kafka API。您無法使用開放原始碼 Apache Kafka API 更新 Connect 叢集。

事前準備

並非所有 Connect 叢集的屬性都能編輯。 更新前請先查看 Connect 叢集的屬性

編輯 Connect 叢集所需的角色和權限

如要取得編輯 Connect 叢集所需的權限,請要求管理員授予您專案的代管 Kafka Connect 叢集編輯者 (roles/managedkafka.connectClusterEditor) IAM 角色。如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

這個預先定義的角色具備編輯 Connect 叢集所需的權限。如要查看確切的必要權限,請展開「Required permissions」(必要權限) 部分:

所需權限

如要編輯 Connect 叢集,必須具備下列權限:

  • 在指定位置授予更新的 Connect 叢集權限: managedkafka.connectClusters.update
  • 在指定位置授予檢視 Connect 叢集的權限。只有透過控制台更新連結叢集時,才需要這項權限: managedkafka.connectors.list Google Cloud

您或許還可透過自訂角色或其他預先定義的角色取得這些權限。

如要進一步瞭解這個角色,請參閱「Managed Service for Apache Kafka 預先定義的角色」。

編輯連結叢集

更新 CPU 和記憶體等特定屬性時,需要重新啟動叢集。

叢集重新啟動會保留資料,但可能會增加延遲。叢集中的初始工作站數量會決定重新啟動時間長度。

您可以更新下列 Connect 叢集屬性:

屬性 可編輯
vCPU
記憶體
網路
工作人員子網路
可解析的 DNS 網域 是 (新增/刪除)
Connect 叢集名稱
Kafka 叢集
位置
標籤 是 (新增/編輯/刪除)
密鑰 是 (新增/刪除)

控制台

  1. 前往 Google Cloud 控制台的「Connect Clusters」(連結叢集) 頁面。

    前往「Connect Clusters」(連結叢集)

  2. 按一下要更新的 Connect 叢集。

    系統隨即會顯示「Connect cluster details」(連線叢集詳細資料) 頁面。

  3. 按一下 [編輯]

    系統隨即顯示「Edit Kafka Connect cluster」(編輯 Kafka Connect 叢集) 頁面。

  4. 對可編輯的屬性進行必要變更。

  5. 按一下 [儲存]

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 請執行 gcloud managed-kafka connect-clusters update 指令:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    更改下列內容:

    • CONNECT_CLUSTER_ID:Connect 叢集的 ID 或名稱。Connect 叢集的名稱無法變更。
    • LOCATION:Connect 叢集的位置。 Connect 叢集的位置無法變更。
    • CPU:Connect 叢集的 vCPU 數量。最小值為 3 個 vCPU。
    • MEMORY:Connect 叢集的記憶體量。請使用「MB」、「MiB」、「GB」、「GiB」、「TB」或「TiB」單位。例如「10GiB」。每個 vCPU 必須佈建 1 GiB 至 8 GiB 的記憶體。

    • DNS_NAME:子網路網路的 DNS 網域名稱,可供 Connect 叢集查看。
    • LABELS:(選用) 要與叢集建立關聯的標籤。如要進一步瞭解標籤格式,請參閱「標籤」。要新增的標籤 KEY=VALUE 組合清單。鍵的開頭須為小寫字元,而且只能包含連字號 (-)、底線 (_)、小寫字元和數字。值只能包含連字號 (-)、底線 (_)、小寫字元和數字。
    • SECRET:(選用) 要載入工作人員的密鑰。 必須提供 Secret Manager 的確切 Secret 版本,不支援別名。一個叢集最多可載入 32 個密鑰。格式: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET:Connect 叢集的工作站子網路。worker 子網路必須與 Connect 叢集位於相同區域。

      子網路的格式為 projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID

  3. Go

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Go 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Go API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證(ADC)。 詳情請參閱「為本機開發環境設定 ADC」。

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Java 設定操作說明進行操作。詳情請參閱 Managed Service for Apache Kafka Java API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「 為本機開發環境設定 ADC」。

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    在試用這個範例之前,請先按照「 安裝用戶端程式庫」中的 Python 設定說明操作。詳情請參閱 Managed Service for Apache Kafka Python API 參考說明文件

    如要向 Managed Service for Apache Kafka 進行驗證,請設定應用程式預設憑證。詳情請參閱「為本機開發環境設定 ADC」。

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

後續步驟

Apache Kafka® 是 The Apache Software Foundation 或其關聯企業在美國與/或其他國家/地區的註冊商標。