Konektor Sumber Pub/Sub mengalirkan pesan dari Pub/Sub ke Kafka. Dengan begitu, Anda dapat mengintegrasikan Pub/Sub dengan aplikasi dan pipeline data berbasis Kafka.
Konektor membaca pesan dari langganan Pub/Sub, mengonversi setiap pesan menjadi rekaman Kafka, dan menulis rekaman ke topik Kafka. Secara default, konektor membuat rekaman Kafka sebagai berikut:
- Kunci data Kafka adalah
null. - Nilai rekaman Kafka adalah data pesan Pub/Sub sebagai byte.
- Header rekaman Kafka kosong.
Namun, Anda dapat mengonfigurasi perilaku ini. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
Sebelum memulai
Sebelum membuat konektor Sumber Pub/Sub, pastikan Anda memiliki hal berikut:
Topik Pub/Sub dengan langganan.
Topik Kafka dalam cluster Kafka.
Hubungkan cluster. Saat membuat cluster Connect, tetapkan cluster Managed Service for Apache Kafka sebagai cluster Kafka utama.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang
diperlukan untuk membuat konektor Sumber Pub/Sub,
minta administrator Anda untuk memberi Anda
peran IAM Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
di project yang berisi cluster Connect.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor Sumber Pub/Sub. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor Sumber Pub/Sub:
-
Berikan izin pembuatan konektor di cluster Connect induk:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Editor Konektor Kafka Terkelola, lihat Peran standar Managed Service untuk Apache Kafka.
Jika cluster Managed Service for Apache Kafka Anda berada dalam project yang sama dengan cluster Connect, tidak diperlukan izin lebih lanjut. Jika cluster Connect berada di project lain, lihat Membuat Cluster Connect di project lain.
Memberikan izin untuk membaca dari Pub/Sub
Akun layanan Managed Kafka harus memiliki izin untuk membaca pesan dari langganan Pub/Sub. Berikan peran IAM berikut kepada akun layanan di project yang berisi langganan Pub/Sub:
- Pub/Sub Subscriber (
roles/pubsub.subscriber) - Pub/Sub Viewer (
roles/pubsub.viewer)
Akun layanan Managed Kafka memiliki format berikut:
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Ganti PROJECT_NUMBER dengan nomor project.
Buat konektor Sumber Pub/Sub
Konsol
Di konsol Google Cloud , buka halaman Connect Clusters.
Klik cluster Connect tempat Anda ingin membuat konektor.
Klik Create connector.
Untuk nama konektor, masukkan string.
Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka.
Untuk Connector plugin, pilih Pub/Sub Source.
Dalam daftar Langganan Cloud Pub/Sub, pilih langganan Pub/Sub. Konektor menarik pesan dari langganan ini. Langganan ditampilkan sebagai nama resource lengkap:
projects/{project}/subscriptions/{subscription}.Di daftar Kafka topic, pilih topik Kafka tempat pesan ditulis.
Opsional: Di kotak Configurations, tambahkan properti konfigurasi atau edit properti default. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi konektor.
Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi cluster Connect.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML atau JSON.
Berikut adalah contoh file konfigurasi:
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Ganti kode berikut:
PROJECT_ID: ID project Google Cloud tempat langganan Pub/Sub berada.
PUBSUB_SUBSCRIPTION_ID: ID langganan Pub/Sub untuk menarik data.
KAFKA_TOPIC_ID: ID topik Kafka tempat data ditulis.
Properti konfigurasi cps.project, cps.subscription, dan kafka.topic diperlukan. Untuk opsi konfigurasi tambahan, lihat
Mengonfigurasi konektor.
Terraform
Anda dapat menggunakan resource Terraform untuk membuat konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Setelah membuat konektor, Anda dapat mengedit, menghapus, menjeda, menghentikan, atau memulai ulang konektor.
Mengonfigurasi konektor
Bagian ini menjelaskan beberapa properti konfigurasi yang dapat Anda tetapkan di konektor.
Untuk mengetahui daftar lengkap properti yang khusus untuk konektor ini, lihat Konfigurasi konektor Sumber Pub/Sub.
Mode tarik
Mode pull menentukan cara konektor mengambil pesan Pub/Sub. Mode berikut didukung:
Mode tarik (default). Pesan diambil dalam batch. Untuk mengaktifkan mode ini, tetapkan
cps.streamingPull.enabled=false.Untuk mengonfigurasi ukuran batch, tetapkan properticps.maxBatchSize.Untuk mengetahui informasi selengkapnya tentang mode pull, lihat Pull API.
Mode Tarik Streaming. Memungkinkan throughput maksimum dan latensi terendah saat mengambil pesan dari Pub/Sub. Untuk mengaktifkan mode ini, tetapkan
cps.streamingPull.enabled=true.Untuk mengetahui informasi selengkapnya tentang mode pull streaming, lihat StreamingPull API.
Jika penarikan streaming diaktifkan, Anda dapat menyesuaikan performa dengan menyetel properti konfigurasi berikut:
cps.streamingPull.flowControlBytes: Jumlah maksimum byte pesan yang belum diproses per tugas.cps.streamingPull.flowControlMessages: Jumlah maksimum pesan yang belum diproses per tugas.cps.streamingPull.maxAckExtensionMs: Jumlah waktu maksimum konektor memperpanjang batas waktu langganan, dalam milidetik.cps.streamingPull.maxMsPerAckExtension: Jumlah waktu maksimum yang digunakan konektor untuk memperpanjang batas waktu langganan per perpanjangan, dalam milidetik.cps.streamingPull.parallelStreams: Jumlah aliran untuk menarik pesan dari langganan.
Endpoint Pub/Sub
Secara default, konektor menggunakan endpoint Pub/Sub global. Untuk
menentukan endpoint, tetapkan properti cps.endpoint ke alamat endpoint.
Untuk mengetahui informasi selengkapnya tentang endpoint, lihat Endpoint Pub/Sub.
Kumpulan data Kafka
Konektor Sumber Pub/Sub mengonversi pesan Pub/Sub menjadi rekaman Kafka. Bagian berikut menjelaskan proses konversi.
Kunci catatan
Pengonversi kunci harus berupa org.apache.kafka.connect.storage.StringConverter.
Secara default, kunci rekaman adalah
null.Untuk menggunakan atribut pesan Pub/Sub sebagai kunci, tetapkan
kafka.key.attributeke nama atribut. Contohnya,kafka.key.attribute=username.Untuk menggunakan kunci pengurutan Pub/Sub sebagai kunci, tetapkan
kafka.key.attribute=orderingKey.
Header rekaman
Secara default, header rekaman kosong.
Jika kafka.record.headers adalah true, atribut pesan Pub/Sub
ditulis sebagai header rekaman. Untuk menyertakan kunci pengurutan, tetapkan
cps.makeOrderingKeyAttribute=true.
Nilai catatan
Jika kafka.record.headers adalah true, atau pesan Pub/Sub tidak memiliki atribut kustom, nilai rekaman adalah data pesan, sebagai array byte.
Tetapkan konverter nilai ke
org.apache.kafka.connect.converters.ByteArrayConverter.
Jika tidak, jika kafka.record.headers adalah false dan pesan memiliki minimal satu
atribut kustom, konektor akan menulis nilai rekaman sebagai struct. Tetapkan
konverter nilai ke org.apache.kafka.connect.json.JsonConverter.
struct berisi kolom berikut:
message: Data pesan Pub/Sub, sebagai byte.Kolom untuk setiap atribut pesan Pub/Sub. Untuk menyertakan kunci pengurutan, tetapkan
cps.makeOrderingKeyAttribute=true.
Misalnya, dengan asumsi pesan memiliki atribut username:
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Jika value.converter.schemas.enable adalah true, struct mencakup payload dan skema:
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Partisi Kafka
Secara default, konektor menulis ke satu partisi dalam topik. Untuk menentukan
jumlah partisi yang ditulis konektor, tetapkan properti kafka.partition.count. Nilai tidak boleh melebihi
jumlah partisi topik.
Untuk menentukan cara konektor menetapkan pesan ke partisi, tetapkan properti
kafka.partition.scheme. Untuk mengetahui informasi selengkapnya, lihat
Konfigurasi konektor Sumber Pub/Sub.