Membuat cluster Connect

Cluster Connect menyediakan lingkungan untuk konektor yang membantu memindahkan data dari deployment Kafka yang ada ke cluster Managed Service untuk Apache Kafka Google Cloud atau memindahkan data dari cluster Managed Service untuk Apache Kafka ke layanan lain atau cluster Kafka lain. Google Cloud Cluster Kafka sekunder dapat berupa cluster Google Cloud Managed Service for Apache Kafka lain, cluster yang dikelola sendiri, atau cluster lokal.

Sebelum memulai

Pastikan Anda telah membuat cluster Managed Service for Apache Kafka. Anda memerlukan nama cluster Managed Service for Apache Kafka yang akan dilampirkan ke cluster Connect.

Setiap cluster Connect dikaitkan dengan cluster Managed Service for Apache Kafka. Cluster ini menyimpan status konektor yang berjalan di cluster Connect.

Peran dan izin yang diperlukan untuk membuat cluster Connect

Untuk mendapatkan izin yang diperlukan guna membuat cluster Connect, minta administrator untuk memberi Anda peran IAM Managed Kafka Connect Cluster Editor (roles/managedkafka.connectClusterEditor) di project Anda. Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Peran bawaan ini berisi izin yang diperlukan untuk membuat cluster Connect. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:

Izin yang diperlukan

Izin berikut diperlukan untuk membuat cluster Connect:

  • Beri izin pembuatan cluster Connect di lokasi yang ditentukan: managedkafka.connectClusters.create

Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.

Untuk mengetahui informasi selengkapnya tentang peran ini, lihat Peran bawaan Managed Service for Apache Kafka.

Principal ACL yang diperlukan

Secara default, cluster Managed Service for Apache Kafka memungkinkan Cluster Connect mengakses resource jika tidak ada ACL yang dikonfigurasi. Hal ini dilakukan dengan menyetel allow.everyone.if.no.acl.found ke true, yang merupakan setelan default.

Namun, jika cluster Managed Service for Apache Kafka telah dikonfigurasi dengan ACL, Cluster Connect tidak akan otomatis memiliki izin baca dan tulis ke resource. Anda harus memberikan izin secara manual.

Akun layanan cluster Connect yang digunakan sebagai akun utama di ACL mengikuti format ini: User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com.

Jika Anda telah mengonfigurasi ACL di cluster Kafka, berikan izin baca dan tulis ke topik serta izin baca ke grup konsumen untuk cluster Connect menggunakan perintah berikut:

/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --operation WRITE --topic *
/bin/kafka-acls.sh \
    --bootstrap-server BOOTSTRAP_ADDR \
    --command-config PATH_TO_CLIENT_PROPERTIES \
    --add \
    --allow-principal User:service-{consumer project number}@gcp-sa-managedkafka.iam.gserviceaccount.com \
    --operation READ --group *

Untuk mengetahui informasi selengkapnya tentang perintah ini, lihat Mengonfigurasi ACL Apache Kafka untuk kontrol akses terperinci.

Membuat cluster Connect di project yang berbeda

Saat Anda membuat cluster Connect, cluster tersebut akan menggunakan agen layanan yang sama dengan cluster Managed Service for Apache Kafka yang berada dalam project yang sama. Jika cluster Managed Service untuk Apache Kafka ini ditetapkan sebagai cluster Kafka utama yang terhubung ke cluster Connect, tidak ada izin tambahan yang diperlukan.

Agen layanan memiliki format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com. Nomor project adalah nomor project yang berisi cluster Connect dan cluster Managed Service for Apache Kafka.

Jika cluster Connect Anda berada di project A dan cluster Managed Service for Apache Kafka terkait berada di project B, ikuti langkah-langkah berikut:

  1. Pastikan Managed Kafka API diaktifkan untuk project A dan project B.

    Mengaktifkan API

  2. Identifikasi agen layanan cluster Connect di project A.

    Agen layanan memiliki format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com.

  3. Di project B, berikan peran Managed Kafka Client role (roles/managedkafka.client) kepada akun layanan cluster Connect.

    Peran ini memberikan izin yang diperlukan untuk terhubung ke cluster Managed Service for Apache Kafka dan melakukan operasi seperti membaca dan menulis data.

    Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Membuat dan memberikan peran untuk agen layanan.

Selalu ikuti prinsip hak istimewa terendah saat memberikan izin. Berikan hanya izin yang diperlukan untuk memastikan keamanan dan mencegah akses yang tidak sah.

Properti cluster Connect

Bagian ini menjelaskan properti cluster Connect.

Nama cluster yang terhubung

Nama cluster Connect yang Anda buat. Untuk mengetahui panduan tentang cara memberi nama cluster Connect, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama cluster tidak dapat diubah.

Cluster Kafka utama

Cluster Managed Service for Apache Kafka yang terkait dengan cluster Connect Anda. Cluster terkait ini (cluster utama) menyimpan status konektor yang berjalan di cluster Connect. Secara umum, cluster Managed Service for Apache Kafka utama juga berfungsi sebagai tujuan untuk semua konektor sumber dan input untuk semua konektor sink yang berjalan di cluster Connect.

Satu cluster Managed Service untuk Apache Kafka dapat memiliki beberapa cluster Connect. Jika Anda memilih cluster Managed Service for Apache Kafka di project yang berbeda, pastikan izin yang sesuai telah dikonfigurasi.

Anda tidak dapat mengupdate ke cluster Kafka lain setelah membuat cluster Connect.

Manfaat kolokasi region untuk latensi dan biaya jaringan

Menempatkan cluster Managed Service untuk Apache Kafka dan Connect di region yang sama akan mengurangi latensi dan biaya jaringan. Misalnya, anggap cluster Managed Service for Apache Kafka Anda berada di region-a dan Anda menggunakan konektor sink untuk menulis data dari cluster Managed Service for Apache Kafka ini (sumber) ke tabel BigQuery (sink) yang juga berada di region-a. Jika Anda men-deploy cluster Connect di region-a, pilihan deployment ini akan meminimalkan latensi untuk operasi penulisan BigQuery dan menghilangkan biaya transfer jaringan antar-region antara cluster Managed Service untuk Apache Kafka dan cluster Connect.

Pertimbangan biaya dan latensi multi-sistem

Kafka Connect menggunakan konektor untuk memindahkan data antar-sistem. Salah satu sisi konektor selalu berinteraksi dengan cluster Managed Service for Apache Kafka. Satu cluster Kafka Connect dapat menjalankan beberapa konektor, yang masing-masing bertindak sebagai sumber (menarik data dari sistem) atau sink (mendorong data ke sistem).

Meskipun cluster Connect di region yang sama dengan cluster Managed Service for Apache Kafka mendapatkan manfaat dari latensi komunikasi yang lebih rendah di antara keduanya, setiap konektor juga berinteraksi dengan sistem lain, seperti tabel BigQuery atau cluster Kafka lainnya. Meskipun cluster Connect dan cluster Managed Service untuk Apache Kafka ditempatkan bersama, sistem lain tersebut dapat berada di region yang berbeda. Hal ini menyebabkan latensi dan biaya yang lebih tinggi. Latensi pipeline keseluruhan bergantung pada lokasi ketiga sistem tersebut: cluster Managed Service untuk Apache Kafka, cluster Connect, dan sistem sumber atau tujuan.

Misalnya, jika cluster Managed Service for Apache Kafka Anda berada di region-a, cluster Connect Anda berada di region-b, dan Anda menggunakan konektor Cloud Storage untuk bucket di region-c, Anda akan ditagih untuk dua hop jaringan (region-a ke region-b, lalu region-b ke region-c, atau sebaliknya, bergantung pada arah konektor).

Pertimbangkan dengan cermat semua region yang terlibat saat merencanakan penempatan cluster Connect untuk mengoptimalkan latensi dan biaya.

Konfigurasi kapasitas

Konfigurasi kapasitas mengharuskan Anda mengonfigurasi jumlah vCPU dan jumlah memori untuk setiap vCPU untuk cluster Connect Anda. Anda dapat memperbarui kapasitas cluster Connect setelah membuatnya. Berikut adalah properti untuk konfigurasi kapasitas:

  • vCPU: Jumlah vCPU yang ditetapkan ke cluster Connect. Nilai minimumnya adalah 3 vCPU.

  • Memori: Jumlah memori yang ditetapkan untuk setiap vCPU. Anda harus menyediakan antara 1 GiB dan 8 GiB per vCPU. Jumlah memori dapat ditingkatkan atau dikurangi dalam batas ini setelah cluster dibuat.

    Misalnya, jika Anda membuat cluster dengan 6 vCPU, memori minimum yang dapat dialokasikan ke cluster adalah 6 GiB (1 GiB per vCPU), dan maksimumnya adalah 48 GiB (8 GiB per vCPU).

vCPU dan memori yang dialokasikan untuk setiap pekerja di cluster Connect memiliki dampak signifikan terhadap performa, kapasitas, dan biaya cluster. Berikut uraian tentang pengaruh vCPU dan memori terhadap cluster Connect.

Jumlah vCPU

  • Kafka Connect membagi pekerjaan konektor menjadi tugas. Setiap tugas dapat memproses data secara paralel. vCPU yang lebih banyak berarti lebih banyak tugas dapat dijalankan secara bersamaan, sehingga menghasilkan throughput yang lebih tinggi.

  • Lebih banyak vCPU akan meningkatkan biaya untuk cluster Connect Anda.

Memori

  • Kafka Connect menggunakan memori untuk mem-buffer data saat mengalir di antara konektor dan Managed Service untuk Apache Kafka. Memori yang lebih besar memungkinkan buffer yang lebih besar. Memori besar dapat meningkatkan throughput, terutama untuk aliran data bervolume tinggi. Konektor yang menangani pesan atau catatan yang sangat besar memerlukan memori yang cukup untuk memprosesnya tanpa mengalami pengecualian OutOfMemoryError.

  • Memori yang lebih besar akan meningkatkan biaya cluster Connect Anda.

  • Jika menggunakan logika transformasi berat, Anda memerlukan alokasi memori yang lebih besar.

Tujuan Anda adalah memilih konfigurasi kapasitas yang tepat untuk cluster Connect Anda. Untuk melakukannya, Anda harus memahami throughput yang dapat ditangani oleh cluster Connect Anda.

Subnet pekerja (primer)

Subnet pekerja, yang juga dikenal sebagai subnet utama, menghubungkan jaringan VPC Anda ke cluster Connect. Subnet ini memungkinkan pekerja cluster menjangkau endpoint sumber dan sink di jaringan konsumen, seperti cluster Managed Service for Apache Kafka atau cluster Kafka yang dihosting sendiri.

Berikut beberapa persyaratan untuk mengonfigurasi subnet pekerja:

  • Subnet pekerja diperlukan.

  • Subnet harus berada di region yang sama dengan cluster Connect.

  • Subnet harus berada di VPC induk yang sama dengan salah satu daftar cluster Kafka utama yang berisi subnet yang terhubung.

  • Rentang CIDR subnet harus memiliki ukuran minimum /22 (1024 alamat).

Worker cluster diberi alamat IP di subnet worker, menggunakan antarmuka Private Service Connect. Pekerja dapat menjangkau tujuan jaringan apa pun yang dapat diakses dari jaringan VPC subnet, dengan persyaratan berikut:

  • Endpoint tidak boleh berada dalam rentang CIDR 172.16.0.0/14. Rentang ini dicadangkan untuk penggunaan internal Managed Service for Apache Kafka Connect.
  • Aturan firewall harus mengizinkan traffic. Lihat Mengonfigurasi keamanan untuk lampiran jaringan.
  • Untuk traffic internet, Anda harus mengonfigurasi Cloud NAT. Misalnya, Cloud NAT diperlukan agar konektor MirrorMaker dapat mereplikasi data dari cluster Kafka yang dapat diakses melalui internet.
  • Untuk mengakses endpoint Private Service Connect yang berada di VPC yang berbeda dengan VPC subnet pekerja, Anda harus memastikan bahwa Anda menggunakan konfigurasi konsumen yang didukung (misalnya, NCC). Untuk mengetahui informasi selengkapnya, lihat Tentang mengakses layanan yang dipublikasikan melalui endpoint.

Domain DNS yang dapat diselesaikan

Domain DNS yang dapat di-resolve, yang juga dikenal sebagai nama domain DNS, memungkinkan alamat DNS di jaringan VPC konsumen tersedia untuk VPC tenant. Hal ini memungkinkan cluster Connect me-resolve nama DNS ke alamat IP, sehingga memfasilitasi komunikasi dengan layanan lain, termasuk cluster Kafka lain untuk konektor MirrorMaker.

Untuk domain DNS yang dapat diselesaikan, Anda dapat memilih cluster Managed Service for Apache Kafka. Anda tidak perlu mengonfigurasi nama domain DNS untuk cluster Managed Service for Apache Kafka primer, karena alamat bootstrap-nya otomatis disertakan dalam daftar domain DNS yang dapat di-resolve.

Namun, Anda juga dapat menentukan domain DNS secara manual, yang diperlukan jika Anda memilih cluster Kafka eksternal. Domain DNS cluster Managed Service for Apache Kafka utama disertakan secara otomatis. Cluster Kafka lainnya masih memerlukan konfigurasi domain DNS.

Resource Secret Manager

Tentukan Secret Manager yang akan dimuat ke dalam worker. Secret ini disimpan dengan aman di Secret Manager dan disediakan untuk cluster Connect Anda.

Anda dapat secara opsional menggunakan Secret Manager dalam konfigurasi konektor. Misalnya, Anda dapat memuat file kunci ke dalam cluster Connect dan membuat konektor membaca file tersebut. Secret Manager dipasang sebagai file di pekerja.

Cluster yang terhubung terintegrasi langsung dengan Secret Manager. Anda harus menggunakan Secret Manager untuk menyimpan dan mengelola secret Anda.

Format untuk menentukan secret adalah: projects/{PROJECT_ID}/secrets/{SECRET_NAME}/versions/{VERSION_ID}

  • PROJECT_ID: ID project tempat secret Secret Manager Anda berada.

  • SECRET_NAME: Nama secret di Secret Manager.

  • VERSION_ID: Nomor versi spesifik secret. Ini adalah angka seperti "1", "2", "3".

Anda dapat memuat hingga 32 rahasia ke dalam satu cluster Connect.

Pastikan agen layanan yang menjalankan pekerja Connect Anda memiliki peran secretmanager.secretAccessor (Secret Manager Secret Accessor) pada secret yang ingin Anda gunakan. Peran ini memungkinkan cluster Connect untuk mengambil nilai rahasia dari Secret Manager.

Label

Label adalah pasangan nilai kunci yang membantu Anda dalam mengatur dan mengidentifikasi. Cluster ini membantu Anda mengatur cluster Connect. Anda dapat melampirkan label ke setiap cluster Connect, lalu memfilter resource berdasarkan labelnya. Contoh label adalah environment:prod, application:web-app.

Membuat cluster Connect

Sebelum membuat cluster, tinjau dokumentasi untuk Menghubungkan properti cluster.

Pembuatan cluster Connect memerlukan waktu 20 hingga 30 menit.

Konsol

  1. Di konsol Google Cloud , buka halaman Connect Clusters.

    Buka Connect Clusters

  2. Klik Create.

    Halaman Create a Connect cluster akan terbuka.

  3. Untuk Connect cluster name, masukkan string.

    Untuk mengetahui informasi selengkapnya tentang cara memberi nama cluster Connect, lihat Pedoman untuk memberi nama resource Managed Service for Apache Kafka.

  4. Untuk Primary Kafka cluster, pilih cluster Managed Service for Apache Kafka dari menu.

    Untuk mengetahui informasi selengkapnya tentang fungsi yang dilakukan oleh cluster Managed Service for Apache Kafka ini, lihat Cluster Kafka utama.

  5. Untuk Lokasi, pilih lokasi yang didukung dari menu Wilayah atau pertahankan nilai default.

    Untuk mengetahui informasi selengkapnya tentang cara memilih lokasi yang tepat, lihat Cluster Kafka utama.

  6. Untuk Konfigurasi kapasitas, masukkan nilai untuk vCPU dan Memori atau pertahankan nilai default.

    Untuk vCPUs, masukkan jumlah CPU virtual untuk cluster.

    Untuk Memori, masukkan jumlah memori per CPU dalam GiB. Pesan error akan ditampilkan jika memori per CPU lebih besar dari 8 GiB.

    Untuk mengetahui informasi selengkapnya tentang cara menentukan ukuran cluster Managed Service for Apache Kafka, lihat Konfigurasi kapasitas.

  7. Untuk Network configuration, dari menu Network, pilih atau pertahankan jaringan cluster Managed Service for Apache Kafka utama.

  8. Untuk Worker subnet, pilih atau pertahankan subnet dari menu.

    Kolom Jalur URI subnet akan terisi otomatis. Untuk mengetahui informasi selengkapnya, lihat Subnet worker.

  9. Untuk Domain DNS yang dapat di-resolve, domain DNS cluster Kafka primer akan otomatis ditambahkan sebagai domain DNS yang dapat di-resolve.

    Untuk menambahkan domain DNS tambahan, perluas bagian ini jika diperlukan.

  10. Klik Tambahkan domain DNS.

    Pilih cluster Kafka dari menu.

    Domain DNS akan diisi secara otomatis. Anda juga dapat mengetik nama domain DNS untuk cluster Kafka eksternal.

    Klik Done.

  11. Untuk Secret Manager resources, luaskan bagian jika diperlukan.

  12. Klik Tambahkan resource rahasia.

  13. Pilih secret dari menu Secret dan versi dari menu Secret version. Anda juga dapat membuat Secret baru.

    Pastikan agen layanan yang menjalankan pekerja Connect Anda memiliki peran Secret Manager Secret Accessor pada secret yang ingin Anda gunakan. Untuk mengetahui informasi selengkapnya tentang Secret Manager, lihat referensi Secret Manager.

  14. Klik Done.

  15. Klik Tambahkan resource secret jika Anda perlu menambahkan lebih banyak secret.

  16. Untuk Label, luaskan bagian jika diperlukan.

    Untuk mengatur project Anda, tambahkan label arbitrer sebagai key-value pair ke resource Anda.

    Klik Tambahkan Label untuk menyertakan lingkungan, layanan, pemilik, tim, dan sebagainya yang berbeda.

  17. Klik Create.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Jalankan perintah gcloud managed-kafka connect-clusters create:

    gcloud managed-kafka connect-clusters create CONNECT_CLUSTER_ID \
        --location=LOCATION \
        --cpu=CPU \
        --memory=MEMORY \
        --primary-subnet=WORKER_SUBNET \
        --kafka-cluster=KAFKA_CLUSTER \
        [--project=PROJECT_ID] \
        [--secret=SECRET] \
        [--dns-name=DNS_DOMAIN_NAME] \
        [--config-file=CONFIG_FILE] \
        [--labels=LABELS]
        [--async]
    

    Ganti kode berikut:

    • CONNECT_CLUSTER_ID: ID atau nama cluster Connect. Untuk mengetahui panduan tentang cara memberi nama cluster Connect, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama cluster Connect tidak dapat diubah.

    • LOCATION: Lokasi tempat Anda membuat cluster Connect. Harus berupa region Google Cloudyang didukung. Anda tidak dapat mengubah lokasi cluster Connect setelah dibuat. Untuk daftar lokasi yang tersedia, lihat lokasi Managed Service for Apache Kafka. Untuk mengetahui informasi selengkapnya tentang rekomendasi lokasi, lihat Cluster Kafka utama.

    • CPU: Jumlah vCPU untuk cluster Connect. Nilai minimumnya adalah 3 vCPU. Lihat jumlah vCPU.

    • MEMORY: Jumlah memori untuk cluster Connect. Gunakan satuan "MB", "MiB", "GB", "GiB", "TB", atau "TiB". Misalnya, "3GiB". Anda harus menyediakan antara 1 GiB dan 8 GiB per vCPU. Lihat Memory.

    • WORKER_SUBNET: Subnet pekerja untuk cluster Connect.

      Format subnetnya adalah projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_ID.

      Subnet pekerja harus berada di region yang sama dengan cluster Connect.

    • PROJECT_ID: (Opsional) ID Google Cloud project. Jika tidak diberikan, project saat ini akan digunakan.

    • KAFKA_CLUSTER: ID atau nama yang sepenuhnya memenuhi syarat dari cluster Managed Service for Apache Kafka utama yang terkait dengan cluster Connect. Lihat Cluster Kafka. Format cluster Kafka adalah projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID.

      Anda tidak dapat mengupdate ke cluster Kafka yang berbeda setelah membuat cluster Connect.

    • SECRET: (Opsional) Secret yang akan dimuat ke dalam pekerja. Versi Secret yang tepat dari Secret Manager harus diberikan, alias tidak didukung. Hingga 32 rahasia dapat dimuat ke dalam satu cluster. Format: projects/PROJECT_ID/secrets/SECRET_NAME/versions/VERSION_ID

    • DNS_DOMAIN_NAME: (Opsional) Nama domain DNS dari subnet yang akan dibuat terlihat oleh Connect Cluster. Cluster Connect dapat mengakses resource menggunakan nama domain, bukan mengandalkan alamat IP. Lihat peering DNS.

    • LABELS: (Opsional) Label untuk dikaitkan dengan cluster. Untuk mengetahui informasi selengkapnya tentang format label, lihat Label. Daftar pasangan KEY=VALUE label yang akan ditambahkan. Kunci harus diawali dengan huruf kecil dan hanya berisi tanda hubung (-), garis bawah (_), huruf kecil, dan angka. Nilai hanya boleh berisi tanda hubung (-), garis bawah (_), huruf kecil, dan angka.

    • CONFIG_FILE: (Opsional) Jalur ke file JSON atau YAML yang berisi konfigurasi yang diganti dari default cluster atau konektor. File ini juga mendukung JSON atau YAML inline.

    • --async: (Opsional) Langsung ditampilkan, tanpa menunggu operasi yang sedang berlangsung selesai. Dengan tanda --async, Anda dapat melanjutkan tugas lain saat pembuatan cluster terjadi di latar belakang. Jika Anda tidak menggunakan tanda, sistem akan menunggu hingga operasi selesai sebelum menampilkan respons. Anda harus menunggu hingga cluster diupdate sepenuhnya sebelum dapat melanjutkan tugas lainnya.

    Anda akan mendapatkan respons yang mirip dengan berikut ini:

    Create request issued for: [sample-connectcluster]
    Check operation [projects/test-project/locations/us-east1/operations/operation-1753590328249-63ae19098cc06-64300a0a-06512d02] for status.
    

    Simpan OPERATION_ID untuk melacak progres. Misalnya, nilainya di sini adalah operation-1753590328249-63ae19098cc06-64300a0a-06512d02.

  3. Terraform

    Anda dapat menggunakan resource Terraform untuk membuat cluster Connect.

    resource "google_managed_kafka_connect_cluster" "default" {
      provider           = google-beta
      project            = data.google_project.default.project_id
      connect_cluster_id = "my-connect-cluster-id"
      location           = "us-central1"
      kafka_cluster      = google_managed_kafka_cluster.default.id
      capacity_config {
        vcpu_count   = 12
        memory_bytes = 12884901888 # 12 GiB
      }
      gcp_config {
        access_config {
          network_configs {
            primary_subnet = google_compute_subnetwork.default.id
          }
        }
      }
    }

    Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.

    Go

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.

    Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createConnectCluster(w io.Writer, projectID, region, clusterID, kafkaCluster string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-connect-cluster"
    	// kafkaCluster := "projects/my-project-id/locations/us-central1/clusters/my-kafka-cluster"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	locationPath := fmt.Sprintf("projects/%s/locations/%s", projectID, region)
    	clusterPath := fmt.Sprintf("%s/connectClusters/%s", locationPath, clusterID)
    
    	// Capacity configuration with 12 vCPU and 12 GiB RAM
    	capacityConfig := &managedkafkapb.CapacityConfig{
    		VcpuCount:   12,
    		MemoryBytes: 12884901888, // 12 GiB in bytes
    	}
    
    	// Optionally, you can also specify accessible subnets and resolvable DNS
    	// domains as part of your network configuration. For example:
    	// networkConfigs := []*managedkafkapb.ConnectNetworkConfig{
    	// 	{
    	// 		PrimarySubnet:      primarySubnet,
    	// 		AdditionalSubnets:  []string{"subnet-1", "subnet-2"},
    	// 		DnsDomainNames:     []string{"domain-1", "domain-2"},
    	// 	},
    	// }
    
    	connectCluster := &managedkafkapb.ConnectCluster{
    		Name:           clusterPath,
    		KafkaCluster:   kafkaCluster,
    		CapacityConfig: capacityConfig,
    	}
    
    	req := &managedkafkapb.CreateConnectClusterRequest{
    		Parent:           locationPath,
    		ConnectClusterId: clusterID,
    		ConnectCluster:   connectCluster,
    	}
    	op, err := client.CreateConnectCluster(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnectCluster got err: %w", err)
    	}
    	// The duration of this operation can vary considerably, typically taking 5-15 minutes.
    	resp, err := op.Wait(ctx)
    	if err != nil {
    		return fmt.Errorf("op.Wait got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created connect cluster: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    
    import com.google.api.gax.longrunning.OperationFuture;
    import com.google.api.gax.longrunning.OperationSnapshot;
    import com.google.api.gax.longrunning.OperationTimedPollAlgorithm;
    import com.google.api.gax.retrying.RetrySettings;
    import com.google.api.gax.retrying.RetryingFuture;
    import com.google.api.gax.retrying.TimedRetryAlgorithm;
    import com.google.cloud.managedkafka.v1.CapacityConfig;
    import com.google.cloud.managedkafka.v1.ConnectAccessConfig;
    import com.google.cloud.managedkafka.v1.ConnectCluster;
    import com.google.cloud.managedkafka.v1.ConnectGcpConfig;
    import com.google.cloud.managedkafka.v1.ConnectNetworkConfig;
    import com.google.cloud.managedkafka.v1.CreateConnectClusterRequest;
    import com.google.cloud.managedkafka.v1.LocationName;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectSettings;
    import com.google.cloud.managedkafka.v1.OperationMetadata;
    import java.time.Duration;
    import java.util.concurrent.ExecutionException;
    
    public class CreateConnectCluster {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-connect-cluster";
        String subnet = "my-subnet"; // e.g. projects/my-project/regions/my-region/subnetworks/my-subnet
        String kafkaCluster = "my-kafka-cluster"; // The Kafka cluster to connect to
        int cpu = 12;
        long memoryBytes = 12884901888L; // 12 GiB
        createConnectCluster(projectId, region, clusterId, subnet, kafkaCluster, cpu, memoryBytes);
      }
    
      public static void createConnectCluster(
          String projectId,
          String region,
          String clusterId,
          String subnet,
          String kafkaCluster,
          int cpu,
          long memoryBytes)
          throws Exception {
        CapacityConfig capacityConfig = CapacityConfig.newBuilder().setVcpuCount(cpu)
            .setMemoryBytes(memoryBytes).build();
        ConnectNetworkConfig networkConfig = ConnectNetworkConfig.newBuilder()
            .setPrimarySubnet(subnet)
            .build();
        // Optionally, you can also specify additional accessible subnets and resolvable
        // DNS domains as part of your network configuration. For example:
        // .addAllAdditionalSubnets(List.of("subnet-1", "subnet-2"))
        // .addAllDnsDomainNames(List.of("dns-1", "dns-2"))
        ConnectGcpConfig gcpConfig = ConnectGcpConfig.newBuilder()
            .setAccessConfig(ConnectAccessConfig.newBuilder().addNetworkConfigs(networkConfig).build())
            .build();
        ConnectCluster connectCluster = ConnectCluster.newBuilder()
            .setCapacityConfig(capacityConfig)
            .setGcpConfig(gcpConfig)
            .setKafkaCluster(kafkaCluster)
            .build();
    
        // Create the settings to configure the timeout for polling operations
        ManagedKafkaConnectSettings.Builder settingsBuilder = ManagedKafkaConnectSettings.newBuilder();
        TimedRetryAlgorithm timedRetryAlgorithm = OperationTimedPollAlgorithm.create(
            RetrySettings.newBuilder()
                .setTotalTimeoutDuration(Duration.ofHours(1L))
                .build());
        settingsBuilder.createConnectClusterOperationSettings()
            .setPollingAlgorithm(timedRetryAlgorithm);
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient
            .create(settingsBuilder.build())) {
          CreateConnectClusterRequest request = CreateConnectClusterRequest.newBuilder()
              .setParent(LocationName.of(projectId, region).toString())
              .setConnectClusterId(clusterId)
              .setConnectCluster(connectCluster)
              .build();
    
          // The duration of this operation can vary considerably, typically taking
          // between 10-30 minutes.
          OperationFuture<ConnectCluster, OperationMetadata> future = managedKafkaConnectClient
              .createConnectClusterOperationCallable().futureCall(request);
    
          // Get the initial LRO and print details.
          OperationSnapshot operation = future.getInitialFuture().get();
          System.out.printf(
              "Connect cluster creation started. Operation name: %s\nDone: %s\nMetadata: %s\n",
              operation.getName(), operation.isDone(), future.getMetadata().get().toString());
    
          while (!future.isDone()) {
            // The pollingFuture gives us the most recent status of the operation
            RetryingFuture<OperationSnapshot> pollingFuture = future.getPollingFuture();
            OperationSnapshot currentOp = pollingFuture.getAttemptResult().get();
            System.out.printf("Polling Operation:\nName: %s\n Done: %s\n",
                currentOp.getName(),
                currentOp.isDone());
          }
    
          // NOTE: future.get() blocks completion until the operation is complete (isDone
          // = True)
          ConnectCluster response = future.get();
          System.out.printf("Created connect cluster: %s\n", response.getName());
        } catch (ExecutionException e) {
          System.err.printf("managedKafkaConnectClient.createConnectCluster got err: %s\n", 
              e.getMessage());
          throw e;
        }
      }
    }

    Python

    Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.

    Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud import managedkafka_v1
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import ManagedKafkaConnectClient
    from google.cloud.managedkafka_v1.types import ConnectCluster, CreateConnectClusterRequest, ConnectNetworkConfig
    
    # TODO(developer): Update with your values.
    # project_id = "my-project-id"
    # region = "us-central1"
    # connect_cluster_id = "my-connect-cluster"
    # kafka_cluster_id = "my-kafka-cluster"
    # primary_subnet = "projects/my-project-id/regions/us-central1/subnetworks/default"
    # cpu = 12
    # memory_bytes = 12884901888  # 12 GiB
    
    connect_client = ManagedKafkaConnectClient()
    kafka_client = managedkafka_v1.ManagedKafkaClient()
    
    parent = connect_client.common_location_path(project_id, region)
    kafka_cluster_path = kafka_client.cluster_path(project_id, region, kafka_cluster_id)
    
    connect_cluster = ConnectCluster()
    connect_cluster.name = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    connect_cluster.kafka_cluster = kafka_cluster_path
    connect_cluster.capacity_config.vcpu_count = cpu
    connect_cluster.capacity_config.memory_bytes = memory_bytes
    connect_cluster.gcp_config.access_config.network_configs = [ConnectNetworkConfig(primary_subnet=primary_subnet)]
    # Optionally, you can also specify accessible subnets and resolvable DNS domains as part of your network configuration.
    # For example:
    # connect_cluster.gcp_config.access_config.network_configs = [
    #     ConnectNetworkConfig(
    #         primary_subnet=primary_subnet,
    #         additional_subnets=additional_subnets,
    #         dns_domain_names=dns_domain_names,
    #     )
    # ]
    
    request = CreateConnectClusterRequest(
        parent=parent,
        connect_cluster_id=connect_cluster_id,
        connect_cluster=connect_cluster,
    )
    
    try:
        operation = connect_client.create_connect_cluster(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        # Creating a Connect cluster can take 10-40 minutes.
        response = operation.result(timeout=3000)
        print("Created Connect cluster:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")
    

Memantau operasi pembuatan cluster

Anda dapat menjalankan perintah berikut hanya jika Anda menjalankan gcloud CLI untuk membuat cluster Connect.

  • Pembuatan cluster Connect biasanya memerlukan waktu 20-30 menit. Untuk melacak progres pembuatan cluster, perintah gcloud managed-kafka connect-clusters create menggunakan operasi yang berjalan lama (LRO), yang dapat Anda pantau menggunakan perintah berikut:

    gcloud managed-kafka operations describe OPERATION_ID \
        --location=LOCATION
    

    Ganti kode berikut:

    • OPERATION_ID dengan nilai ID operasi dari bagian sebelumnya.
    • LOCATION dengan nilai lokasi dari bagian sebelumnya.

Apa langkah selanjutnya?

Apache Kafka® adalah merek dagang terdaftar milik The Apache Software Foundation atau afiliasinya di Amerika Serikat dan/atau negara lain.