Konektor Pub/Sub Sink melakukan streaming pesan dari topik Kafka ke topik Pub/Sub. Hal ini memungkinkan Anda mengintegrasikan aplikasi berbasis Kafka dengan Pub/Sub, sehingga memfasilitasi arsitektur berbasis peristiwa dan pemrosesan data real-time.
Sebelum memulai
Sebelum membuat konektor Pub/Sub Sink, pastikan Anda memiliki hal berikut:
Buat cluster Managed Service for Apache Kafka untuk cluster Connect Anda. Ini adalah cluster Kafka utama yang terkait dengan cluster Connect. Cluster ini juga merupakan cluster sumber yang membentuk salah satu ujung pipeline konektor.
Buat cluster Connect untuk menghosting konektor Pub/Sub Sink.
Buat dan konfigurasi topik Kafka dalam cluster sumber. Data dipindahkan dari topik Kafka ini ke topik Pub/Sub tujuan.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang diperlukan untuk membuat konektor Sink Pub/Sub, minta administrator untuk memberi Anda peran IAM berikut di project yang berisi cluster Connect:
-
Managed Kafka Connector Editor (
roles/managedkafka.connectorEditor) -
Pub/Sub:
Pub/Sub Publisher (
roles/pubsub.publisher)
Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Pub/Sub Sink. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor Pub/Sub Sink:
-
Berikan izin pembuatan konektor di cluster Connect induk:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.
Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.
Memberikan izin untuk memublikasikan ke topik Pub/Sub
Akun layanan cluster Connect, yang mengikuti format
service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com,
memerlukan izin untuk memublikasikan pesan ke
topik Pub/Sub. Untuk melakukannya, berikan
peran Pub/Sub Publisher (roles/pubsub.publisher)
ke akun layanan cluster Connect di project yang berisi
topik Pub/Sub.
Cara kerja konektor Sink Pub/Sub
Konektor Sink Pub/Sub menarik pesan dari satu atau beberapa topik Kafka dan memublikasikannya ke topik Pub/Sub.
Berikut uraian mendetail tentang cara konektor Sink Pub/Sub menyalin data:
Konektor menggunakan pesan dari satu atau beberapa topik Kafka dalam cluster sumber.
Konektor menulis pesan ke ID topik Pub/Sub target yang ditentukan menggunakan properti konfigurasi
cps.topic. Ini adalah properti wajib diisi.Konektor juga memerlukan project Google Cloud yang berisi topik Pub/Sub untuk ditentukan menggunakan properti konfigurasi
cps.project. Ini adalah properti wajib diisi.Konektor juga dapat secara opsional menggunakan endpoint Pub/Sub kustom yang ditentukan dengan menggunakan properti
cps.endpoint. Endpoint default adalah"pubsub.googleapis.com:443".Untuk mengoptimalkan performa, konektor akan mem-buffer pesan sebelum memublikasikannya ke Pub/Sub. Anda dapat mengonfigurasi
maxBufferSize,maxBufferBytes,maxDelayThresholdMs,maxOutstandingRequestBytes, danmaxOutstandingMessagesuntuk mengontrol buffering.Kafka record memiliki tiga komponen: header, kunci, nilai. Konektor menggunakan konverter kunci dan nilai untuk mengubah data pesan Kafka menjadi format yang diharapkan oleh Pub/Sub. Saat menggunakan skema nilai peta atau struct, properti
messageBodyNamemenentukan kolom atau kunci yang akan digunakan sebagai isi pesan Pub/Sub.Konektor dapat menyertakan topik, partisi, offset, dan stempel waktu Kafka sebagai atribut pesan dengan menggunakan properti
metadata.publishyang ditetapkan ketrue.Konektor dapat menyertakan header pesan Kafka sebagai atribut pesan Pub/Sub dengan menggunakan properti
headers.publishyang ditetapkan ketrue.Konektor dapat menyertakan kunci pengurutan untuk pesan Pub/Sub dengan menggunakan properti
orderingKeySource. Opsi untuk nilainya mencakup"none"(default),"key", dan"partition".Properti
tasks.maxmengontrol tingkat paralelisme untuk konektor. Meningkatkantasks.maxdapat meningkatkan throughput, tetapi paralelisme sebenarnya dibatasi oleh jumlah partisi dalam topik Kafka.
Properti konektor Sink Pub/Sub
Saat membuat konektor Pub/Sub Sink, Anda perlu menentukan properti berikut.
Nama konektor
Nama unik untuk konektor dalam cluster Connect. Untuk mengetahui panduan tentang penamaan resource, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Jenis plugin konektor
Pilih Pub/Sub Sink sebagai jenis plugin konektor. Hal ini menentukan arah aliran data, yaitu dari Kafka ke Pub/Sub dan implementasi konektor spesifik yang digunakan. Jika Anda tidak menggunakan antarmuka pengguna untuk mengonfigurasi konektor, Anda juga harus menentukan class konektor.
Topik Kafka
Topik Kafka tempat konektor menggunakan pesan.
Anda dapat menentukan satu atau beberapa topik, atau menggunakan ekspresi reguler untuk mencocokkan beberapa topik. Misalnya, topic.* untuk mencocokkan semua topik yang dimulai dengan "topic". Topik ini harus ada dalam cluster Managed Service for Apache Kafka yang terkait dengan cluster Connect Anda.
Topik Pub/Sub
Topik Pub/Sub yang ada tempat konektor
memublikasikan pesan. Pastikan akun layanan cluster Connect memiliki
peran roles/pubsub.publisher di project topik, seperti
yang dijelaskan dalam Sebelum memulai.
Konfigurasi
Di bagian ini, Anda dapat menentukan properti konfigurasi tambahan khusus konektor.
Karena data dalam topik Kafka dapat memiliki berbagai format seperti Avro, JSON, atau byte mentah, bagian penting dari konfigurasi melibatkan penentuan konverter. Konverter menerjemahkan data dari format yang digunakan di topik Kafka Anda ke format internal Kafka Connect yang standar. Konektor Sink Pub/Sub kemudian mengambil data internal ini dan mengubahnya menjadi format yang diperlukan oleh Pub/Sub sebelum menuliskannya.
Untuk mengetahui informasi umum selengkapnya tentang peran konverter di Kafka Connect, jenis konverter yang didukung, dan opsi konfigurasi umum, lihat konverter.
Berikut beberapa konfigurasi khusus untuk konektor Pub/Sub Sink:
cps.project: Menentukan ID project Google Cloud yang berisi topik Pub/Sub.cps.topic: Menentukan topik Pub/Sub tempat data dipublikasikan.cps.endpoint: Menentukan endpoint Pub/Sub yang akan digunakan.
Untuk mengetahui daftar properti konfigurasi yang tersedia khusus untuk konektor ini, lihat Konfigurasi konektor Sink Pub/Sub.
Membuat konektor Sink Pub/Sub
Sebelum membuat konektor, tinjau dokumentasi untuk Properti konektor Sink Pub/Sub.
Konsol
Di konsol Google Cloud , buka halaman Connect Clusters.
Klik cluster Connect yang konektornya ingin Anda buat.
Halaman Connect cluster details akan ditampilkan.
Klik Create connector.
Halaman Create Kafka connector akan ditampilkan.
Untuk nama konektor, masukkan string.
Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Untuk Connector plugin, pilih Pub/Sub Sink.
Di bagian Topics, pilih Select a list of Kafka topics atau Use a topic regex. Kemudian, pilih atau masukkan topik Kafka yang akan digunakan konektor ini untuk menerima pesan. Topik ini ada di cluster Kafka terkait Anda.
Untuk Select a Cloud Pub/Sub topic, pilih topik Pub/Sub tempat konektor ini memublikasikan pesan. Topik ditampilkan dalam format nama lengkap resource:
projects/{project}/topics/{topic}.(Opsional) Konfigurasi setelan tambahan di bagian Configurations. Di sinilah Anda akan menentukan properti seperti
tasks.max,key.converter, danvalue.converter, seperti yang dibahas di bagian sebelumnya.Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.
Berikut adalah contoh file konfigurasi untuk konektor Pub/Sub Sink:
connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector" name: "CPS_SINK_CONNECTOR_ID" tasks.max: "1" topics: "GMK_TOPIC_ID" value.converter: "org.apache.kafka.connect.storage.StringConverter" key.converter: "org.apache.kafka.connect.storage.StringConverter" cps.topic: "CPS_TOPIC_ID" cps.project: "GCP_PROJECT_ID"Ganti kode berikut:
CPS_SINK_CONNECTOR_ID: ID atau nama konektor Pub/Sub Sink. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service for Apache Kafka. Nama konektor tidak dapat diubah.
GMK_TOPIC_ID: ID topik Managed Service for Apache Kafka yang datanya dibaca oleh konektor Pub/Sub Sink.
CPS_TOPIC_ID: ID topik Pub/Sub tempat data dipublikasikan.
GCP_PROJECT_ID: ID project Google Cloud tempat topik Pub/Sub Anda berada.
Terraform
Anda dapat menggunakan resource Terraform untuk membuat konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.