Konektor Sumber Pub/Sub melakukan streaming pesan dari Pub/Sub ke Kafka, yang memungkinkan Anda mengintegrasikan Pub/Sub dengan aplikasi dan pipeline data berbasis Kafka.
Kasus penggunaan untuk konektor Sumber Pub/Sub mencakup hal berikut:
Penyerapan data real-time. Memublikasikan data dari layanan cloud atau aplikasi lain ke Pub/Sub, lalu mereplikasi data ke Kafka untuk pemrosesan streaming.
Arsitektur berbasis peristiwa. Memicu pemrosesan berbasis Kafka dari pesan yang dipublikasikan ke Pub/Sub.
Konektor membaca pesan dari langganan Pub/Sub, mengonversi setiap pesan menjadi data Kafka, dan menulis data ke topik Kafka. Secara default, konektor membuat data Kafka sebagai berikut:
- Kunci data Kafka adalah
null. - Nilai data Kafka adalah data pesan Pub/Sub sebagai byte.
- Header data Kafka kosong.
Namun, Anda dapat mengonfigurasi perilaku ini. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
Sebelum memulai
Sebelum membuat konektor Sumber Pub/Sub, pastikan Anda memiliki hal berikut:
Topik Pub/Sub dengan langganan.
Cluster Managed Service untuk Apache Kafka.
Topik Kafka dalam cluster Kafka.
Cluster Connect . Saat membuat cluster Connect, tetapkan cluster Managed Service untuk Apache Kafka sebagai cluster Kafka utama.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang diperlukan untuk membuat konektor, minta administrator Anda untuk memberi Anda peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor) di project Anda.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor:
-
Membuat konektor:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Memberikan izin untuk membaca dari Pub/Sub
Akun layanan Managed Kafka harus memiliki izin untuk membaca pesan dari langganan Pub/Sub. Berikan peran IAM berikut ke akun layanan di project yang berisi langganan Pub/Sub:
- Pub/Sub Subscriber (
roles/pubsub.subscriber) - Pub/Sub Viewer (
roles/pubsub.viewer)
Akun layanan Managed Kafka memiliki format berikut:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com,
dengan PROJECT_NUMBER adalah nomor project cluster Connect.
Jika cluster Connect Anda berada di project yang berbeda dengan cluster Managed Service untuk Apache Kafka cluster, lihat Membuat cluster Connect di project yang berbeda.
Membuat konektor Sumber Pub/Sub
Konsol
Di Google Cloud konsol, buka halaman Connect Clusters.
Klik cluster Connect tempat Anda ingin membuat konektor.
Klik Create connector.
Untuk nama konektor, masukkan string.
Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Untuk Connector plugin, pilih Pub/Sub Source.
Dalam daftar Cloud Pub/Sub subscription, pilih langganan Pub/Sub. Konektor mengambil pesan dari langganan ini. Langganan ditampilkan sebagai nama resource lengkap:
projects/{project}/subscriptions/{subscription}.Dalam daftar Kafka topic, pilih topik Kafka tempat pesan ditulis.
Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
Pilih Task restart policy. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi cluster Connect.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML atau JSON.
Berikut adalah contoh file konfigurasi:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Ganti kode berikut:
PROJECT_ID: ID project tempat langganan Pub/Sub berada. Google Cloud
PUBSUB_SUBSCRIPTION_ID: ID langganan Pub/Sub untuk mengambil data.
KAFKA_TOPIC_ID: ID topik Kafka tempat data ditulis.
Properti konfigurasi cps.project, cps.subscription, dan kafka.topic diperlukan. Untuk opsi konfigurasi tambahan, lihat
Mengonfigurasi konektor.
Terraform
Anda dapat menggunakan resource Terraform untuk membuat a konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat Dokumentasi referensi Managed Service untuk Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.
Mengonfigurasi konektor
Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan pada konektor.
Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sumber Pub/Sub.
Mode pull
Mode pull menentukan cara konektor mengambil pesan Pub/Sub. Mode berikut didukung:
Mode pull (default). Pesan diambil dalam batch. Untuk mengaktifkan mode ini, tetapkan
cps.streamingPull.enabled=false.Untuk mengonfigurasi ukuran batch, tetapkan properticps.maxBatchSize.Untuk mengetahui informasi selengkapnya tentang mode pull, lihat Pull API.
Mode Streaming Pull. Mengaktifkan throughput maksimum dan latensi terendah saat mengambil pesan dari Pub/Sub. Untuk mengaktifkan mode ini, tetapkan
cps.streamingPull.enabled=true.Untuk mengetahui informasi selengkapnya tentang mode streaming pull, lihat StreamingPull API.
Jika streaming pull diaktifkan, Anda dapat menyesuaikan performa dengan menetapkan properti konfigurasi berikut:
cps.streamingPull.flowControlBytes: Jumlah maksimum byte pesan yang belum selesai per tugas.cps.streamingPull.flowControlMessages: Jumlah maksimum pesan yang belum selesai per tugas.cps.streamingPull.maxAckExtensionMs: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan, dalam milidetik.cps.streamingPull.maxMsPerAckExtension: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan per perpanjangan, dalam milidetik.cps.streamingPull.parallelStreams: Jumlah streaming untuk mengambil pesan dari langganan.
Endpoint Pub/Sub
Secara default, konektor menggunakan endpoint Pub/Sub global. Untuk menentukan endpoint, tetapkan properti cps.endpoint ke alamat endpoint.
Untuk mengetahui informasi selengkapnya tentang endpoint, lihat
Endpoint Pub/Sub.
Partisi Kafka
Secara default, konektor menulis ke satu partisi dalam topik. Untuk menentukan jumlah partisi yang ditulis konektor, tetapkan properti kafka.partition.count. Nilai tidak boleh melebihi jumlah partisi topik
.
Untuk menentukan cara konektor menetapkan pesan ke partisi, tetapkan properti kafka.partition.scheme. Untuk mengetahui informasi selengkapnya, lihat
Konfigurasi konektor Sumber Pub/Sub.
Pelaku konversi
Tetapkan pengonversi kunci ke org.apache.kafka.connect.storage.StringConverter.
Bergantung pada konfigurasi konektor, tetapkan pengonversi nilai ke salah satu hal berikut:
org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.json.JsonConverter
Untuk mengetahui informasi selengkapnya, lihat Nilai data.
Konversi pesan
Konektor Sumber Pub/Sub mengonversi pesan Pub/Sub ke data Kafka. Bagian berikut menjelaskan proses konversi.
Kunci data
Pengonversi kunci harus berupa org.apache.kafka.connect.storage.StringConverter.
Secara default, kunci data adalah
null.Untuk menggunakan atribut pesan Pub/Sub sebagai kunci, tetapkan
kafka.key.attributeke nama atribut. Misalnya,kafka.key.attribute=username.Untuk menggunakan kunci pengurutan Pub/Sub sebagai kunci, tetapkan
kafka.key.attribute=orderingKey.
Header data
Secara default, header data kosong.
Jika kafka.record.headers adalah true, atribut pesan Pub/Sub akan ditulis sebagai header data. Untuk menyertakan kunci pengurutan, tetapkan cps.makeOrderingKeyAttribute=true.
Nilai data
Nilai data ditulis sebagai array byte atau sebagai jenis struct.
Nilai data array byte
Jika kafka.record.headers adalah true, atau pesan Pub/Sub tidak memiliki atribut kustom, konektor akan menulis data pesan sebagai array byte.
Tetapkan pengonversi nilai ke org.apache.kafka.connect.converters.ByteArrayConverter.
Nilai data struct
Jika kafka.record.headers adalah false dan pesan memiliki setidaknya satu atribut kustom, konektor akan menulis nilai data sebagai struct. Tetapkan pengonversi nilai ke org.apache.kafka.connect.json.JsonConverter.
struct berisi kolom berikut:
message: Data pesan Pub/Sub, sebagai byte.Kolom untuk setiap atribut pesan Pub/Sub. Untuk menyertakan kunci pengurutan, tetapkan
cps.makeOrderingKeyAttribute=true.
Misalnya, jika pesan memiliki atribut username, nilai data akan terlihat seperti berikut:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Jika value.converter.schemas.enable adalah true, struct akan menyertakan payload dan skema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}