Memperbarui cluster Connect

Anda dapat mengedit cluster Connect untuk memperbarui properti seperti jumlah vCPU, memori, jaringan, dan label.

Untuk mengedit cluster Connect, Anda dapat menggunakan Google Cloud Konsol, gcloud CLI, library klien, atau Managed Kafka API. Anda tidak dapat menggunakan Apache Kafka API open source untuk memperbarui cluster Connect.

Sebelum memulai

Tidak semua properti cluster Connect dapat diedit. Tinjau properti klaster Connect sebelum Anda memperbarui.

Peran dan izin yang diperlukan untuk mengedit cluster Connect

Untuk mendapatkan izin yang diperlukan untuk mengedit cluster Connect, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connect Cluster Editor (roles/managedkafka.connectClusterEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk mengedit cluster Connect. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk mengedit cluster Connect:

  • Beri pembaruan izin cluster Connect di lokasi yang ditentukan: managedkafka.connectClusters.update
  • Memberi tampilan izin cluster Connect di lokasi yang ditentukan. Izin ini hanya diperlukan untuk memperbarui Cluster Connect menggunakan konsol Google Cloud : managedkafka.connectors.list

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Mengedit cluster Connect

Memperbarui properti tertentu, seperti CPU dan memori, memerlukan mulai ulang cluster.

Mulai ulang cluster akan mempertahankan data, tetapi dapat meningkatkan latensi. Jumlah awal worker dalam cluster menentukan durasi mulai ulang.

Anda dapat memperbarui properti cluster Connect berikut:

Properti Dapat diedit
vCPU Ya
Memori Ya
Jaringan Ya
Subnet Pekerja Ya
Domain DNS yang dapat diselesaikan Ya (Tambahkan/Hapus)
Nama cluster yang terhubung Tidak
Cluster Kafka Tidak
Lokasi Tidak
Label Ya (Menambahkan/Mengedit/Menghapus)
Rahasia Ya (Tambahkan/Hapus)

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik cluster Connect yang ingin Anda update.

    Halaman Connect cluster details akan ditampilkan.

  3. Klik Edit.

    Halaman Edit cluster Kafka Connect akan ditampilkan.

  4. Lakukan perubahan yang diperlukan pada properti yang dapat diedit.

  5. Klik Simpan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka connect-clusters update:

    gcloud managed-kafka connect-clusters update CONNECT_CLUSTER_ID \
        --location=LOCATION \
        [--cpu=CPU --memory=MEMORY
         | --clear-dns-names \
         | --dns-name=DNS_NAME --clear-labels \
         | --labels=LABELS --clear-secrets \
         | --secret=SECRET [--primary-subnet=WORKER_SUBNET \
        [--async]
    

    Ganti kode berikut:

    • CONNECT_CLUSTER_ID: ID atau nama cluster Connect. Nama cluster Connect tidak dapat diubah.
    • LOCATION: Lokasi cluster Connect. Lokasi cluster Connect tidak dapat diubah.
    • CPU: Jumlah vCPU untuk cluster Connect. Nilai minimumnya adalah 3 vCPU.
    • MEMORY: Jumlah memori untuk cluster Connect. Gunakan satuan "MB", "MiB", "GB", "GiB", "TB", atau "TiB". Misalnya, "10GiB". Anda harus menyediakan antara 1 GiB dan 8 GiB per vCPU.

    • DNS_NAME: Nama domain DNS dari jaringan subnet yang akan dibuat terlihat oleh Connect Cluster.
    • LABELS: (Opsional) Label untuk dikaitkan dengan cluster. Untuk mengetahui informasi selengkapnya tentang format label, lihat Label. Daftar pasangan KEY=VALUE label yang akan ditambahkan. Kunci harus diawali dengan karakter huruf kecil dan hanya berisi tanda hubung (-), garis bawah (_), karakter huruf kecil, dan angka. Nilai hanya boleh berisi tanda hubung (-), garis bawah (_), karakter huruf kecil, dan angka.
    • SECRET: (Opsional) Secret yang akan dimuat ke dalam pekerja. Versi Secret yang tepat dari Secret Manager harus diberikan, alias tidak didukung. Hingga 32 rahasia dapat dimuat ke dalam satu cluster. Format: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID
    • WORKER_SUBNET: Subnet pekerja untuk cluster Connect. Subnet pekerja harus berada di region yang sama dengan cluster Connect.

      Format subnetnya adalah projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

  3. Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConnectCluster(w io.Writer, projectID, region, clusterID string, memoryBytes int64, labels map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// memoryBytes := 25769803776 // 24 GiB in bytes
    	// labels := map[string]string{"environment": "production"}
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, clusterID)
    
    	// Capacity configuration update
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memoryBytes,
    	}
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    		Labels:         labels,
    	}
    	paths := []string{"capacity_config.memory_bytes", "labels"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConnectClusterRequest{
    		UpdateMask:     updateMask,
    		ConnectCluster: connectCluster,
    	}
    	op, err := client.UpdateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConnectCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated connect cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateConnectClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateConnectCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateConnectCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setName(ConnectClusterName.of(projectId, region, clusterId).toString())
            .setCapacityConfig(capacityConfig)
            .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create(
            settingsBuilder.build())) {
          UpdateConnectClusterRequest request = UpdateConnectClusterRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConnectCluster(connectCluster).build();
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .updateConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateConnectCluster contains sample
          // code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          ConnectCluster response = future.get();
          System.out.printf("Updated connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.updateConnectCluster got err: %s\n", 
              e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import ConnectCluster
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # memory_bytes = 4295000000
    
    connect_client = ManagedKafkaConnectClient()
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(
        project_id, region, connect_cluster_id
    )
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-service-for-apache-kafka/docs/connect-cluster/create-connect-cluster#properties.
    request = managedkafka_v1.UpdateConnectClusterRequest(
        update_mask=update_mask,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.update_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result()
        response = operation.result()
        print("Updated Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.