Memperbarui cluster Google Cloud Managed Service for Apache Kafka

Anda dapat mengedit cluster Layanan Terkelola Google Cloud untuk Apache Kafka guna memperbarui properti seperti jumlah vCPU, memori, subnet, jenis enkripsi, atau label. Anda juga dapat mengonfigurasi apakah layanan menyeimbangkan ulang partisi di seluruh broker saat broker ditambahkan ke cluster. Layanan ini membuat broker baru secara otomatis berdasarkan konfigurasi memori dan vCPU cluster.

Untuk mengedit cluster, Anda dapat menggunakan konsol Google Cloud , Google Cloud CLI, library klien, atau Managed Kafka API. Anda tidak dapat menggunakan Apache Kafka API open source untuk memperbarui cluster.

Sebelum memulai

Jika Anda mengupdate jumlah vCPU atau memori, aturan berikut akan berlaku:

  • Rasio vCPU ke memori keseluruhan cluster harus selalu berada di antara 1:1 dan 1:8.

  • Jika Anda melakukan penurunan skala, harus ada minimal 1 vCPU dan 1 GiB memori untuk setiap broker yang ada. Jumlah broker tidak pernah berkurang.

  • Jika Anda melakukan penskalaan ke atas, dan perubahan tersebut mengakibatkan penambahan broker baru, vCPU dan memori rata-rata per broker tidak boleh berkurang lebih dari 10% dibandingkan dengan rata-rata sebelum update.

    Misalnya, jika Anda mencoba meng-upgrade cluster dari 45 vCPU (3 broker) menjadi 48 vCPU (4 broker), operasi akan gagal. Hal ini karena vCPU rata-rata per broker berkurang dari 15 menjadi 12, yang merupakan pengurangan sebesar 20%, melebihi batas 10%.

Untuk mengetahui informasi selengkapnya, lihat Memperbarui ukuran cluster.

Memperbarui properti tertentu, seperti jumlah vCPU dan memori, mungkin mengharuskan layanan memulai ulang cluster. Cluster dimulai ulang satu broker dalam satu waktu. Hal ini menyebabkan kegagalan sementara permintaan ke broker individual, tetapi kegagalan ini bersifat sementara. Library klien yang umum digunakan menangani error tersebut secara otomatis.

Anda tidak dapat mengedit nama cluster, lokasi cluster, atau jenis enkripsi.

Peran dan izin yang diperlukan untuk mengedit cluster

Untuk mendapatkan izin yang diperlukan guna memperbarui cluster, minta administrator untuk memberi Anda peran IAM Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk memperbarui cluster. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk memperbarui cluster:

  • Edit cluster: managedkafka.clusters.update

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Peran Editor Cluster Kafka Terkelola tidak memungkinkan Anda membuat, menghapus, atau mengubah topik dan grup konsumen di cluster Managed Service untuk Apache Kafka. Selain itu, akses ini tidak mengizinkan akses bidang data untuk memublikasikan atau menggunakan pesan dalam cluster. Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Mengedit cluster

Untuk mengedit cluster, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

  2. Dari daftar cluster, klik cluster yang propertinya ingin Anda edit.

    Halaman detail cluster akan ditampilkan.

  3. Di halaman detail cluster, klik Edit.

  4. Edit properti sesuai kebutuhan. Properti cluster berikut dapat diedit dari konsol:

    • Memori
    • vCPU
    • Subnet
    • Konfigurasi penyeimbangan ulang
    • Konfigurasi mTLS
    • Label
  5. Klik Simpan.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka clusters update:

    gcloud managed-kafka clusters update CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --labels=LABELS
    

    Ganti kode berikut:

    • CLUSTER_ID: ID atau nama cluster. Anda tidak dapat memperbarui nilai ini.
    • LOCATION: Lokasi cluster. Anda tidak dapat memperbarui nilai ini.
    • CPU: Jumlah CPU virtual untuk cluster.
    • MEMORY: Jumlah memori untuk cluster. Gunakan satuan "MB", "MiB", "GB", "GiB", "TB", atau "TiB". Misalnya, "10GiB".
    • SUBNETS: Daftar subnet yang akan dihubungkan. Gunakan koma untuk memisahkan beberapa nilai subnet.
    • auto-rebalance: Mengaktifkan penyeimbangan ulang otomatis partisi topik di antara broker saat jumlah CPU dalam cluster berubah. Opsi ini diaktifkan secara default.
    • LABELS: Label yang akan dikaitkan dengan cluster.
  3. Jika Anda menggunakan tanda --async dengan perintah, sistem akan mengirim permintaan update dan langsung menampilkan respons, tanpa menunggu operasi selesai. Dengan tanda --async, Anda dapat melanjutkan tugas lain saat update cluster terjadi di latar belakang. Jika Anda tidak menggunakan flag --async, sistem akan menunggu hingga operasi selesai sebelum menampilkan respons. Anda harus menunggu hingga cluster diupdate sepenuhnya sebelum dapat melanjutkan tugas lainnya.

    REST

    Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

    • PROJECT_ID: Project ID Google Cloud Anda
    • LOCATION: lokasi cluster
    • CLUSTER_ID: ID cluster
    • UPDATE_MASK: kolom yang akan diupdate, sebagai daftar nama yang sepenuhnya memenuhi syarat yang dipisahkan koma. Contoh: capacityConfig.vcpuCount,capacityConfig.memoryBytes
    • CPU_COUNT: jumlah vCPU untuk cluster
    • MEMORY: jumlah memori untuk cluster, dalam byte
    • SUBNET_ID: ID subnet yang akan dihubungkan

    Metode HTTP dan URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID?updateMask=UPDATE_MASK

    Meminta isi JSON:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

    Anda akan melihat respons JSON seperti berikut:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Dalam isi permintaan, sertakan hanya kolom yang Anda perbarui, seperti yang ditentukan dalam parameter kueri UPDATE_MASK. Untuk menambahkan subnet, tambahkan entri baru ke networkConfigs.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateCluster(w io.Writer, projectID, region, clusterID string, memory int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// memoryBytes := 4221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		MemoryBytes: memory,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:           clusterPath,
    		CapacityConfig: capacityConfig,
    	}
    	paths := []string{"capacity_config.memory_bytes"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateClusterRequest{
    		UpdateMask: updateMask,
    		Cluster:    cluster,
    	}
    	op, err := client.UpdateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateCluster got err: %w", err)
    	}
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated cluster: %#v\n", resp)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.UpdateClusterRequest;
    import com.google.protobuf.FieldMask;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class UpdateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        long memoryBytes = 25769803776L; // 24 GiB
        updateCluster(projectId, region, clusterId, memoryBytes);
      }
    
      public static void updateCluster(
          String projectId, String region, String clusterId, long memoryBytes) throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setMemoryBytes(memoryBytes).build();
        Cluster cluster =
            Cluster.newBuilder()
                .setName(ClusterName.of(projectId, region, clusterId).toString())
                .setCapacityConfig(capacityConfig)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("capacity_config.memory_bytes").build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.updateClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
          UpdateClusterRequest request =
              UpdateClusterRequest.newBuilder().setUpdateMask(updateMask).setCluster(cluster).build();
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.updateClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details. CreateCluster contains sample code for polling logs.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster update started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          Cluster response = future.get();
          System.out.printf("Updated cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # memory_bytes = 4295000000
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.memory_bytes = memory_bytes
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("capacity_config.memory_bytes")
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-cluster#properties.
    request = managedkafka_v1.UpdateClusterRequest(
        update_mask=update_mask,
        cluster=cluster,
    )
    
    try:
        operation = client.update_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Updated cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.