Membuat cluster Google Cloud Managed Service for Apache Kafka

Cluster Managed Service untuk Apache Kafka menyediakan lingkungan untuk menyimpan dan memproses aliran pesan yang disusun ke dalam topik.

Untuk membuat cluster, Anda dapat menggunakan konsol Google Cloud , Google Cloud CLI, library klien, atau Managed Kafka API. Anda tidak dapat menggunakan API Apache Kafka open source untuk membuat cluster.

Sebelum memulai

Pastikan Anda memahami hal-hal berikut:

Peran dan izin yang diperlukan untuk membuat cluster

Untuk mendapatkan izin yang diperlukan guna membuat cluster, minta administrator untuk memberi Anda peran IAM Managed Kafka Cluster Editor (roles/managedkafka.clusterEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat cluster. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat cluster:

  • Buat cluster: managedkafka.clusters.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Peran Editor Cluster Kafka Terkelola tidak memungkinkan Anda membuat, menghapus, atau mengubah topik dan grup konsumen di cluster Managed Service untuk Apache Kafka. Selain itu, akses ini tidak mengizinkan akses bidang data untuk memublikasikan atau menggunakan pesan dalam cluster. Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Properti cluster Managed Service for Apache Kafka

Saat membuat atau memperbarui cluster Managed Service for Apache Kafka, Anda harus menentukan properti berikut.

Nama cluster

Nama atau ID cluster Managed Service for Apache Kafka yang Anda buat. Untuk panduan tentang cara memberi nama cluster, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama cluster tidak dapat diubah.

Lokasi

Lokasi tempat Anda membuat cluster. Lokasi harus berada di salah satu region Google Cloud yang didukung. Lokasi cluster tidak dapat diubah nanti. Untuk mengetahui daftar lokasi yang tersedia, lihat lokasi Managed Service for Apache Kafka.

Konfigurasi kapasitas

Konfigurasi kapasitas mengharuskan Anda mengonfigurasi jumlah vCPU dan jumlah memori untuk penyiapan Kafka Anda. Untuk mengetahui informasi selengkapnya tentang cara mengonfigurasi kapasitas cluster, lihat Merencanakan ukuran cluster Kafka.

Berikut adalah properti untuk konfigurasi kapasitas:

  • vCPUs: Jumlah vCPU dalam cluster. Diperlukan minimal 3 vCPU per cluster.

  • Memori: Jumlah memori yang ditetapkan ke cluster. Anda harus menyediakan antara 1 GiB dan 8 GiB per vCPU.

    Misalnya, jika Anda membuat cluster dengan 6 vCPU, memori minimum yang dapat Anda alokasikan ke cluster adalah 6 GiB (1 GiB per vCPU), dan maksimumnya adalah 48 GiB (8 GiB per vCPU).

Untuk mengetahui informasi selengkapnya tentang cara mengubah memori dan jumlah vCPU setelah cluster dibuat, lihat Memperbarui ukuran cluster.

Konfigurasi jaringan

Konfigurasi jaringan adalah daftar subnet VPC tempat cluster dapat diakses. Untuk membuat atau menggunakan pesan, klien harus dapat menjangkau salah satu subnet ini.

Berikut adalah beberapa panduan untuk konfigurasi jaringan Anda:

  • Minimal diperlukan satu subnet untuk cluster. Maksimumnya adalah sepuluh.

  • Tepat satu subnet per jaringan diizinkan untuk cluster tertentu.

  • Setiap subnet harus berada di region yang sama dengan cluster. Project dan jaringan bisa berbeda.

  • Alamat IP untuk broker dan server bootstrap dialokasikan secara otomatis di setiap subnet. Selain itu, entri DNS untuk alamat IP ini dibuat di jaringan VPC yang sesuai.

  • Jika menambahkan subnet dari project lain, Anda harus memberikan izin ke akun layanan yang dikelola Google yang terkait dengan cluster. Untuk informasi selengkapnya, lihat Menghubungkan cluster di seluruh project.

Setelah membuat cluster, Anda dapat memperbarui daftar subnet. Untuk mengetahui informasi selengkapnya tentang jaringan, lihat Mengonfigurasi jaringan untuk Managed Service for Apache Kafka.

Label

Label adalah pasangan nilai kunci yang membantu Anda dalam mengatur dan mengidentifikasi. Label memungkinkan pengategorian resource berdasarkan lingkungan. Contohnya adalah "env:production" dan "owner:data-engineering".

Anda dapat memfilter dan menelusuri resource berdasarkan labelnya. Misalnya, asumsikan Anda memiliki beberapa cluster Managed Service for Apache Kafka untuk departemen yang berbeda. Anda dapat mengonfigurasi dan menelusuri cluster dengan label "department:marketing" untuk menemukan cluster yang relevan dengan cepat.

Konfigurasi penyeimbangan ulang

Setelan ini menentukan apakah layanan secara otomatis menyeimbangkan ulang replika partisi di seluruh broker.

Mode yang tersedia adalah:

  • Penyeimbangan ulang otomatis saat penskalaan: Jika opsi ini diaktifkan, layanan akan otomatis memicu penyeimbangan ulang replika saat Anda melakukan penskalaan cluster. Mode ini membantu mempertahankan distribusi beban yang merata, tetapi mungkin memengaruhi performa untuk sementara selama operasi penyeimbangan ulang.

  • No rebalance: Jika opsi ini diaktifkan, layanan tidak akan menyeimbangkan ulang replika secara otomatis.

Enkripsi

Managed Service for Apache Kafka dapat mengenkripsi pesan dengan Google-owned and Google-managed encryption keys (default) atau Kunci enkripsi yang dikelola pelanggan (CMEK). Setiap pesan dienkripsi saat dalam penyimpanan dan pengiriman. Jenis enkripsi untuk cluster tidak dapat diubah.

Google-owned and Google-managed encryption keys digunakan secara default. Kunci ini dibuat, dikelola, dan disimpan sepenuhnya oleh Google Cloud dalam infrastrukturnya.

CMEK adalah kunci enkripsi yang Anda kelola menggunakan Cloud Key Management Service. Fitur ini memungkinkan Anda memiliki kontrol yang lebih besar atas kunci yang digunakan untuk mengenkripsi data dalam penyimpanan di layanan Google Cloud yang didukung. Penggunaan CMEK akan menimbulkan biaya tambahan terkait Cloud Key Management Service. Untuk penggunaan CMEK, key ring Anda harus berada di lokasi yang sama dengan resource yang Anda gunakan. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi enkripsi pesan.

Konfigurasi mTLS

Secara opsional, Anda dapat mengonfigurasi mTLS sebagai metode autentikasi alternatif yang menggunakan sertifikat klien. Konfigurasi ini mencakup hal berikut:

  • Kumpulan CA: Daftar satu hingga sepuluh kumpulan Certificate Authority Service (CAS) yang dipercaya oleh cluster untuk autentikasi klien.

  • Aturan pemetaan Principal SSL: Properti broker ssl.principal.mapping.rules yang bersifat opsional, tetapi direkomendasikan untuk menyederhanakan nama principal sertifikat yang panjang untuk digunakan dalam ACL Kafka.

Untuk mengetahui informasi selengkapnya tentang mTLS, lihat Mengonfigurasi autentikasi mTLS.

Membuat cluster

Sebelum membuat cluster, tinjau dokumentasi properti cluster.

Pembuatan cluster biasanya memerlukan waktu 20-30 menit.

Untuk membuat cluster, ikuti langkah-langkah berikut:

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

  2. Pilih Create.

    Halaman Create Kafka cluster akan terbuka.

  3. Untuk Cluster name, masukkan string.

    Untuk mengetahui informasi selengkapnya tentang cara memberi nama cluster, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka.

  4. Untuk Location, masukkan lokasi yang didukung.

    Untuk mengetahui informasi selengkapnya tentang lokasi yang didukung, lihat Lokasi Managed Service for Apache Kafka yang didukung.

  5. Untuk Konfigurasi kapasitas, masukkan nilai untuk Memori dan vCPU.

    Untuk mengetahui informasi selengkapnya tentang cara menentukan ukuran cluster Managed Service for Apache Kafka, lihat Merencanakan ukuran cluster Kafka Anda.

  6. Untuk Network configuration, masukkan detail berikut:

    1. Project: Project tempat subnetwork berada. Subnet harus berada di region yang sama dengan cluster, tetapi projectnya mungkin berbeda.
    2. Jaringan: Jaringan tempat subnet terhubung.
    3. Subnetwork: Nama subnet.
    4. Jalur URI subnet: Kolom ini diisi secara otomatis. Atau, Anda dapat memasukkan jalur subnet di sini. Nama subnet harus dalam format: projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.
    5. Klik Done.
  7. (Opsional) Tambahkan subnet tambahan dengan mengklik Tambahkan subnet yang terhubung.

    Anda dapat menambahkan subnet tambahan, hingga nilai maksimum sepuluh.

  8. Pertahankan nilai default lainnya.

  9. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka clusters create:

    gcloud managed-kafka clusters create CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --subnets=SUBNETS \
        --auto-rebalance \
        --encryption-key=ENCRYPTION_KEY \
        --async \
        --labels=LABELS
    

    Ganti kode berikut:

    • CLUSTER_ID: ID atau nama cluster.

      Untuk mengetahui informasi selengkapnya tentang cara memberi nama cluster, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka.

    • LOCATION: Lokasi cluster.

      Untuk mengetahui informasi selengkapnya tentang lokasi yang didukung, lihat Lokasi Managed Service for Apache Kafka.

    • CPU: Jumlah vCPU untuk cluster.

      Untuk mengetahui informasi selengkapnya tentang cara menentukan ukuran cluster Managed Service for Apache Kafka, lihat Merencanakan ukuran cluster Kafka Anda.

    • MEMORY: Jumlah memori untuk cluster. Gunakan satuan "MB", "MiB", "GB", "GiB", "TB", atau "TiB". Misalnya, "10GiB".

    • SUBNETS: Daftar subnet yang akan dihubungkan. Gunakan koma untuk memisahkan beberapa nilai subnet.

      Format subnetnya adalah projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

    • auto-rebalance: Mengaktifkan penyeimbangan ulang otomatis partisi topik di antara broker saat jumlah CPU dalam cluster berubah. Opsi ini diaktifkan secara default.

    • ENCRYPTION_KEY: ID kunci enkripsi yang dikelola pelanggan yang akan digunakan untuk cluster.

      Formatnya adalah projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/CRYPTO_KEY.

    • --async: Memungkinkan sistem mengirim permintaan pembuatan dan langsung mengembalikan respons, tanpa menunggu operasi selesai. Dengan tanda --async, Anda dapat melanjutkan tugas lain saat pembuatan cluster terjadi di latar belakang. Jika Anda tidak menggunakan tanda, sistem akan menunggu hingga operasi selesai sebelum menampilkan respons. Anda harus menunggu hingga cluster diupdate sepenuhnya sebelum dapat melanjutkan tugas lainnya.

    • LABELS: Label yang akan dikaitkan dengan cluster.

      Untuk mengetahui informasi selengkapnya tentang format label, lihat Label.

    Anda akan mendapatkan respons yang mirip dengan berikut ini:

    Create request issued for: [CLUSTER_ID]
    Check operation [projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID] for status.
    

    Simpan OPERATION_ID untuk melacak progress.

  3. REST

    Sebelum menggunakan salah satu data permintaan, lakukan penggantian berikut:

    • PROJECT_ID: Project ID Google Cloud Anda
    • LOCATION: lokasi cluster
    • CLUSTER_ID: ID cluster
    • CPU_COUNT: jumlah vCPU untuk cluster
    • MEMORY: jumlah memori untuk cluster, dalam byte. Contoh: 3221225472.
    • SUBNET_ID: ID subnet yang akan dihubungkan. Contoh: default.

    Metode HTTP dan URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters?clusterId=CLUSTER_ID

    Meminta isi JSON:

    {
      "capacityConfig": {
        "vcpuCount": CPU_COUNT,
        "memoryBytes": MEMORY
      },
      "gcpConfig": {
        "accessConfig": {
          "networkConfigs": [
            {
              "subnet": "projects/PROJECT_ID/regions/LOCATION/subnetworks/SUBNET_ID"
            }
          ]
        }
      }
    }
    

    Untuk mengirim permintaan Anda, perluas salah satu opsi berikut:

    Anda akan melihat respons JSON yang mirip seperti berikut:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "create",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Terraform

    Anda dapat menggunakan resource Terraform untuk membuat cluster.

    resource "google_managed_kafka_cluster" "default" {
      project    = data.google_project.default.project_id # Replace this with your project ID in quotes
      cluster_id = "my-cluster-id"
      location   = "us-central1"
      capacity_config {
        vcpu_count   = 3
        memory_bytes = 3221225472
      }
      gcp_config {
        access_config {
          network_configs {
            subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createCluster(w io.Writer, projectID, region, clusterID, subnet string, cpu, memoryBytes int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// subnet := "projects/my-project-id/regions/us-central1/subnetworks/default"
    	// cpu := 3
    	// memoryBytes := 3221225472
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/clusters/%s", locationPath, clusterID)
    
    	// Memory must be between 1 GiB and 8 GiB per CPU.
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   cpu,
    		MemoryBytes: memoryBytes,
    	}
    	var networkConfig []*managedkafkapb.NetworkConfig
    	networkConfig = append(networkConfig, &managedkafkapb.NetworkConfig{
    		Subnet: subnet,
    	})
    	platformConfig := &managedkafkapb.Cluster_GcpConfig{
    		GcpConfig: &managedkafkapb.GcpConfig{
    			AccessConfig: &managedkafkapb.AccessConfig{
    				NetworkConfigs: networkConfig,
    			},
    		},
    	}
    	rebalanceConfig := &managedkafkapb.RebalanceConfig{
    		Mode: managedkafkapb.RebalanceConfig_AUTO_REBALANCE_ON_SCALE_UP,
    	}
    	cluster := &managedkafkapb.Cluster{
    		Name:            clusterPath,
    		CapacityConfig:  capacityConfig,
    		PlatformConfig:  platformConfig,
    		RebalanceConfig: rebalanceConfig,
    	}
    
    	req := &managedkafkapb.CreateClusterRequest{
    		Parent:    locationPath,
    		ClusterId: clusterID,
    		Cluster:   cluster,
    	}
    	op, err := client.CreateCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 10-40 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.AccessConfig;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.Cluster;
    import com.google.cloud.managedkafka.v1.CreateClusterRequest;
    import com.google.cloud.managedkafka.v1.GcpConfig;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaSettings;
    import com.google.cloud.managedkafka.v1.NetworkConfig;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import com.google.cloud.managedkafka.v1.RebalanceConfig;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        int cpu = 3;
        long memoryBytes = 3221225472L; // 3 GiB
        createCluster(projectId, region, clusterId, subnet, cpu, memoryBytes);
      }
    
      public static void createCluster(
          String projectId, String region, String clusterId, String subnet, int cpu, long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig =
            CapacityConfig.newBuilder().setVcpuCount(cpu).setMemoryBytes(memoryBytes).build();
        NetworkConfig networkConfig = NetworkConfig.newBuilder().setSubnet(subnet).build();
        GcpConfig gcpConfig =
            GcpConfig.newBuilder()
                .setAccessConfig(AccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
                .build();
        RebalanceConfig rebalanceConfig =
            RebalanceConfig.newBuilder()
                .setMode(RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP)
                .build();
        Cluster cluster =
            Cluster.newBuilder()
                .setCapacityConfig(capacityConfig)
                .setGcpConfig(gcpConfig)
                .setRebalanceConfig(rebalanceConfig)
                .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaSettings.Builder settingsBuilder = ManagedKafkaSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create(
            settingsBuilder.build())) {
    
          CreateClusterRequest request =
              CreateClusterRequest.newBuilder()
                  .setParent(LocationName.of(projectId, region).toString())
                  .setClusterId(clusterId)
                  .setCluster(cluster)
                  .build();
    
          // The duration of this operation can vary considerably, typically taking between 10-40
          // minutes.
          OperationFuture<Cluster, OperationMetadata> future =
              managedKafkaClient.createClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf("Cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(),
              operation.isDone(),
              future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone =  True)
          Cluster response = future.get();
          System.out.printf("Created cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaClient.createCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 3
    # memory_bytes = 3221225472
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    cluster = managedkafka_v1.Cluster()
    cluster.name = client.cluster_path(project_id, region, cluster_id)
    cluster.capacity_config.vcpu_count = cpu
    cluster.capacity_config.memory_bytes = memory_bytes
    cluster.gcp_config.access_config.network_configs = [
        managedkafka_v1.NetworkConfig(subnet=subnet)
    ]
    cluster.rebalance_config.mode = (
        managedkafka_v1.RebalanceConfig.Mode.AUTO_REBALANCE_ON_SCALE_UP
    )
    
    request = managedkafka_v1.CreateClusterRequest(
        parent=client.common_location_path(project_id, region),
        cluster_id=cluster_id,
        cluster=cluster,
    )
    
    try:
        operation = client.create_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # The duration of this operation can vary considerably, typically taking 10-40 minutes.
        # We can set a timeout of 3000s (50 minutes).
        response = operation.result(timeout=3000)
        print("Created cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e.message}")
    

Memantau operasi pembuatan cluster

Anda dapat menjalankan perintah berikut hanya jika Anda menjalankan gcloud CLI untuk membuat cluster.

  • Pembuatan cluster biasanya memerlukan waktu 20-30 menit. Untuk melacak progres pembuatan cluster, perintah gcloud managed-kafka clusters create menggunakan operasi yang berjalan lama (LRO), yang dapat Anda pantau menggunakan perintah berikut:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Ganti kode berikut:

    • OPERATION_ID dengan nilai ID operasi dari bagian sebelumnya.
    • LOCATION dengan nilai lokasi dari bagian sebelumnya.

Pemecahan masalah

Berikut adalah beberapa error yang mungkin Anda alami saat membuat cluster.

Service agent service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com has not been granted the required role cloudkms.cryptoKeyEncrypterDecrypter to encrypt data using the KMS key.

Agen layanan Managed Service for Apache Kafka tidak memiliki izin yang diperlukan untuk mengakses kunci Cloud KMS. Lihat dokumentasi peran yang diperlukan untuk mengonfigurasi CMEK.

Service does not have permission to retrieve subnet. Please grant service-${PROJECT_NUMBER}@gcp-sa-managedkafka.iam.gserviceaccount.com the managedkafka.serviceAgent role in the IAM policy of the project ${SUBNET_PROJECT} and ensure the Compute Engine API is enabled in project ${SUBNET_PROJECT}

Agen layanan Managed Service untuk Apache Kafka tidak memiliki peran yang diperlukan untuk mengonfigurasi jaringan di jaringan VPC tempat klien Kafka berjalan. Untuk mengetahui informasi selengkapnya, lihat Menghubungkan cluster di seluruh project.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.