MirrorMaker 2.0 adalah alat yang mereplikasi topik antar-cluster Kafka. Anda dapat membuat konektor MirrorMaker 2.0 berikut:
Sumber MirrorMaker 2.0
Checkpoint MirrorMaker 2.0
Detak Jantung MirrorMaker 2.0
Konektor Sumber MirrorMaker 2.0 selalu diperlukan karena mencerminkan data dari sumber ke cluster target. Ini juga menyinkronkan ACL. Konektor Checkpoint dan Heartbeat MirrorMaker 2.0 bersifat opsional. Anda juga dapat membuat konektor Checkpoint dan Heartbeat MirrorMaker 2.0 tanpa membuat konektor Sumber.
Untuk mengetahui informasi selengkapnya tentang konektor ini, lihat Ringkasan konektor.
Memahami peran cluster di MirrorMaker 2.0
Saat mengonfigurasi MirrorMaker 2.0, Anda harus memahami berbagai peran yang dimainkan oleh cluster Kafka:
Cluster utama: Dalam konteks Managed Service for Apache Kafka, ini adalah cluster Managed Service for Apache Kafka yang terhubung langsung ke cluster Kafka Connect Anda. Cluster Connect menghosting instance konektor MirrorMaker 2.0.
Cluster sekunder: Ini adalah cluster Kafka lainnya yang terlibat dalam replikasi. Cluster ini dapat berupa cluster Managed Service for Apache Kafka lain, atau cluster eksternal. Beberapa contohnya adalah yang dikelola sendiri di Compute Engine, GKE, lokal, atau di cloud lain.
Cluster sumber: Ini adalah cluster Kafka dari mana MirrorMaker 2.0 mereplikasi data.
Cluster target: Ini adalah cluster Kafka yang menjadi tujuan replikasi data MirrorMaker 2.0.
Cluster utama dapat berfungsi sebagai sumber atau target:
Jika cluster utama adalah sumber, cluster sekunder adalah target. Aliran data dari cluster primer ke cluster sekunder.
Jika cluster utama adalah target, cluster sekunder adalah sumber. Data mengalir dari cluster sekunder ke cluster primer.
Untuk meminimalkan latensi operasi tulis, sebaiknya tetapkan cluster target sebagai cluster utama, dan tempatkan cluster Connect di region yang sama dengan cluster target.
Anda harus mengonfigurasi semua properti untuk konektor dengan benar. Setelan ini juga mencakup properti autentikasi produser yang ditujukan ke cluster sekunder. Untuk mengetahui detail tentang potensi masalah, lihat Meningkatkan konfigurasi klien MirrorMaker 2.0.
Sebelum memulai
Untuk membuat konektor MirrorMaker 2.0, selesaikan tugas berikut:
Buat cluster Managed Service for Apache Kafka (primer). Cluster ini berfungsi sebagai salah satu endpoint konektor MirrorMaker 2.0 Anda.
Buat cluster Kafka sekunder. Cluster ini berfungsi sebagai endpoint lainnya. Cluster ini dapat berupa cluster Managed Service untuk Apache Kafka lain atau cluster Kafka eksternal atau yang dikelola sendiri. Anda dapat mengonfigurasi beberapa cluster Kafka sebagai cluster Kafka sekunder dari cluster Connect.
Buat Connect cluster yang menghosting konektor MirrorMaker 2.0 Anda.
Pastikan domain DNS cluster Kafka sekunder dikonfigurasi.
Konfigurasi aturan firewall untuk mengizinkan antarmuka Private Service Connect menjangkau cluster Kafka sumber dan target.
Jika cluster Kafka sumber atau target diakses melalui internet, konfigurasi Cloud NAT untuk mengizinkan pekerja Connect mengakses internet.
Jika cluster sekunder mencakup cluster kafka eksternal atau yang dikelola sendiri, pastikan kredensial yang diperlukan dikonfigurasi sebagai resource secret.
Untuk mengetahui informasi selengkapnya tentang persyaratan jaringan, lihat Subnet pekerja.
Peran dan izin yang diperlukan
Untuk mendapatkan izin yang
diperlukan guna membuat konektor MirrorMaker 2.0,
minta administrator untuk memberi Anda peran IAM
Managed Kafka Connector Editor (roles/managedkafka.connectorEditor)
di project Anda.
Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.
Peran bawaan ini berisi izin yang diperlukan untuk membuat konektor MirrorMaker 2.0. Untuk melihat izin yang benar-benar diperlukan, perluas bagian Izin yang diperlukan:
Izin yang diperlukan
Izin berikut diperlukan untuk membuat konektor MirrorMaker 2.0:
-
Berikan izin pembuatan konektor di cluster Connect induk:
managedkafka.connectors.create
Anda mungkin juga bisa mendapatkan izin ini dengan peran khusus atau peran bawaan lainnya.
Untuk mengetahui informasi selengkapnya tentang peran Managed Kafka Connector Editor, lihat Peran standar Managed Service for Apache Kafka.
Membuat konektor MirrorMaker 2.0 di project lain
Jika cluster Managed Service untuk Apache Kafka utama Anda berada di project yang berbeda dengan Cluster Connect yang menjalankan konektor MirrorMaker 2.0, lihat Membuat Cluster Connect di project yang berbeda.
Menghubungkan ke cluster Kafka sekunder yang dikelola sendiri
Saat terhubung ke cluster Kafka sekunder yang dikelola sendiri, perhatikan jaringan dan autentikasi.
Jaringan: Pastikan setelan jaringan VPC dan aturan firewall yang tepat dikonfigurasi untuk mengizinkan konektivitas antara jaringan VPC cluster Connect dan jaringan yang menghosting cluster eksternal atau yang dikelola sendiri.
Untuk cluster dalam VPC, lihat Membuat dan mengelola jaringan VPC.
Untuk terhubung ke lingkungan lokal atau cloud lainnya, pertimbangkan solusi seperti Cloud VPN atau Cloud Interconnect. Lihat juga panduan khusus untuk menghubungkan ke Kafka lokal.
Autentikasi dan enkripsi: Cluster Connect Anda harus melakukan autentikasi dengan cluster eksternal atau yang dikelola sendiri (jika diperlukan), dan menangani enkripsi TLS. Untuk mengetahui informasi umum tentang autentikasi Kafka, lihat dokumentasi Keamanan Apache Kafka.
Menggunakan Secret Manager untuk kredensial
Cluster yang terhubung terintegrasi langsung dengan Secret Manager. Simpan semua nilai konfigurasi sensitif seperti sandi, serta konten truststore dan keystore yang diperlukan untuk terhubung ke cluster yang dikelola sendiri atau eksternal sebagai secret di Secret Manager.
Secret yang diberikan ke akun layanan cluster Connect akan otomatis dipasang sebagai file dalam lingkungan runtime konektor di direktori
/var/secrets/.Nama file mengikuti pola
{PROJECT_NAME}-{SECRET_NAME}-{SECRET_VERSION}. Anda harus menggunakan nama project, bukan nomor project.Cara Anda mereferensikan secret bergantung pada apakah properti Kafka mengharapkan sandi secret atau jalur ke file:
Untuk sandi, gunakan properti
DirectoryConfigProviderKafka. Tentukan nilai dalam format${directory:/var/secrets}:{SECRET_FILENAME}. Contoh:password=${directory:/var/secrets}:my-project-db-password-1Untuk jalur file, tentukan jalur langsung ke file secret yang dipasang. Contoh:
ssl.truststore.location=/var/secrets/my-project-kafka-truststore-3
Untuk mengetahui detail selengkapnya tentang cara memberikan akses dan mengonfigurasi secret selama pembuatan cluster Connect, lihat Mengonfigurasi secret Secret Manager.
Cara kerja konektor Sumber MirrorMaker
Konektor Sumber MirrorMaker menarik data dari satu atau beberapa topik Kafka di klaster sumber dan mereplikasi data tersebut, beserta ACL, ke topik di klaster target.
Berikut uraian mendetail tentang cara konektor Sumber MirrorMaker mereplikasi data:
Konektor menggunakan pesan dari topik Kafka yang ditentukan dalam cluster sumber. Tentukan topik yang akan direplikasi menggunakan properti konfigurasi
topics, yang menerima nama topik yang dipisahkan koma atau ekspresi reguler gaya Java tunggal. Contohnya,topic-a,topic-bataumy-prefix-.*.Konektor juga dapat melewati replikasi topik tertentu yang Anda tentukan dengan menggunakan properti
topics.exclude; pengecualian menggantikan penyertaan.Konektor menulis pesan yang digunakan ke cluster target.
Konektor memerlukan detail koneksi cluster sumber dan target seperti
source.cluster.bootstrap.serversdantarget.cluster.bootstrap.servers.Konektor juga memerlukan alias untuk cluster sumber dan target seperti yang ditentukan oleh
source.cluster.aliasdantarget.cluster.alias. Secara default, topik yang direplikasi otomatis diganti namanya menggunakan alias sumber. Misalnya, topik bernamaordersdari sumber dengan aliasprimarymenjadiprimary.ordersdi target.ACL yang terkait dengan topik yang direplikasi juga disinkronkan dari cluster sumber ke cluster target. Hal ini dapat dinonaktifkan menggunakan properti
sync.topic.acls.enabled.Detail autentikasi untuk menghubungkan ke cluster sumber dan target harus diberikan dalam konfigurasi jika diperlukan oleh cluster. Anda harus mengonfigurasi properti seperti
security.protocol,sasl.mechanism, dansasl.jaas.config, yang diawali dengansource.cluster.untuk sumber dantarget.cluster.untuk target.Konektor mengandalkan topik internal. Anda mungkin perlu mengonfigurasi properti yang terkait dengan hal ini, seperti
offset-syncs.topic.replication.factor.Konektor menggunakan konverter rekaman Kafka
key.converter,value.converter, danheader.converter. Untuk replikasi langsung, nilai ini sering kali ditetapkan secara default keorg.apache.kafka.connect.converters.ByteArrayConverter, yang tidak melakukan konversi (pass-through).Properti
tasks.maxmengontrol tingkat paralelisme untuk konektor. Meningkatkantasks.maxberpotensi meningkatkan throughput, tetapi paralelisme efektif sering kali dibatasi oleh jumlah partisi dalam topik Kafka sumber yang direplikasi.
Properti konektor MirrorMaker 2.0
Saat Anda membuat atau memperbarui konektor MirrorMaker 2.0, tentukan properti berikut:
Nama konektor
Nama atau ID konektor. Untuk mengetahui panduan tentang cara memberi nama resource, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama tidak dapat diubah.
Jenis konektor
Jenis konektor harus berupa salah satu dari berikut:
Cluster Kafka utama
Cluster Managed Service for Apache Kafka. Sistem akan mengisi kolom ini secara otomatis.
Gunakan cluster Kafka utama sebagai cluster target: Pilih opsi ini untuk memindahkan data dari cluster Kafka lain ke cluster Managed Service for Apache Kafka utama.
Gunakan cluster Kafka utama sebagai cluster sumber: Pilih opsi ini untuk memindahkan data dari cluster Managed Service for Apache Kafka utama ke cluster Kafka lain.
Cluster target atau sumber
Cluster Kafka sekunder yang membentuk ujung lain pipeline.
Cluster Managed Service for Apache Kafka: Pilih cluster dari menu drop-down.
Cluster Kafka yang dikelola sendiri atau eksternal: Masukkan alamat bootstrap dalam format
hostname:port_number. Misalnya:kafka-test:9092
Nama topik atau ekspresi reguler
Topik yang akan direplikasi. Tentukan nama satu per satu (topic1, topic2) atau gunakan
ekspresi reguler (topic.*). Properti ini diperlukan untuk konektor
Sumber MirrorMaker 2.0. Nilai defaultnya adalah .*
Nama grup konsumen atau ekspresi reguler
Grup konsumen yang akan direplikasi. Tentukan nama individual (group1, group2)
atau gunakan ekspresi reguler (group.*). Properti ini diperlukan untuk
konektor Checkpoint MirrorMaker 2.0. Nilai defaultnya adalah .*
Konfigurasi
Di bagian ini, Anda dapat menentukan properti konfigurasi tambahan khusus konektor untuk konektor MirrorMaker 2.0.
Karena data dalam topik Kafka dapat memiliki berbagai format seperti Avro, JSON, atau byte mentah, bagian penting dari konfigurasi melibatkan penentuan konverter. Konverter menerjemahkan data dari format yang digunakan di topik Kafka Anda ke format internal Kafka Connect yang standar.
Untuk mengetahui informasi umum selengkapnya tentang peran konverter di Kafka Connect, jenis konverter yang didukung, dan opsi konfigurasi umum, lihat Konverter.
Beberapa konfigurasi umum untuk semua konektor MirrorMaker 2.0 meliputi:
source.cluster.alias: Alias untuk cluster sumber.target.cluster.alias: Alias untuk cluster target.
Konfigurasi yang digunakan untuk mengecualikan resource tertentu saat mereplikasi data:
topics.exclude: Topik yang dikecualikan. Mendukung nama topik dan ekspresi reguler yang dipisahkan koma. Pengecualian lebih diutamakan daripada penyertaan. Digunakan untuk konektor Sumber MirrorMaker 2.0. Nilai defaultnya adalahmm2.*.internal,.*.replica,__.*groups.exclude: Kecualikan grup. Mendukung ID grup dan ekspresi reguler yang dipisahkan koma. Pengecualian lebih diutamakan daripada penyertaan. Digunakan untuk konektor Checkpoint MirrorMaker 2.0. Nilai defaultnya adalahconsole-consumer-.*,connect-.*,__.*
Konfigurasi autentikasi diperlukan untuk konektor MirrorMaker 2.0.
Jika cluster Kafka sumber atau target adalah cluster Managed Service for Apache Kafka, Connect cluster menggunakan OAuthBearer untuk melakukan autentikasi dengannya. Konfigurasi autentikasi telah dikonfigurasi sebelumnya sehingga Anda tidak perlu menyiapkan konfigurasi secara manual.
Untuk cluster Kafka yang dikelola sendiri atau lokal, konfigurasi autentikasi bergantung pada mekanisme autentikasi yang didukung cluster Kafka. Contoh konfigurasi autentikasi untuk konfigurasi cluster Kafka sumber terlihat seperti berikut:
source.cluster.security.protocol=SASL_SSL
source.cluster.sasl.mechanism=OAUTHBEARER
source.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Contoh konfigurasi autentikasi untuk konfigurasi cluster Kafka target terlihat seperti berikut:
target.cluster.security.protocol=SASL_SSL
target.cluster.sasl.mechanism=OAUTHBEARER
target.cluster.sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Properti konfigurasi yang tersedia bergantung pada konektor tertentu. Periksa versi konektor MirrorMaker 2.0 yang didukung untuk melihat contoh konfigurasi yang didukung. Lihat dokumen berikut:
Properti konfigurasi MirrorMaker 2.0 yang umum untuk semua konektor MirrorMaker 2.0
Properti konfigurasi khusus titik pemeriksaan MirrorMaker 2.0
Konversi rekaman Kafka
Kafka Connect menggunakan org.apache.kafka.connect.converters.ByteArrayConverter
sebagai konverter default untuk kunci dan nilai, yang menyediakan opsi
pass-through yang tidak melakukan konversi.
Anda dapat mengonfigurasi header.converter, key.converter, dan value.converter untuk
menggunakan konverter lain.
Jumlah tugas
Nilai tasks.max mengonfigurasi tugas maksimum yang digunakan Kafka Connect untuk menjalankan konektor MirrorMaker. Class ini mengontrol tingkat paralelisme untuk konektor.
Meningkatkan jumlah tugas dapat meningkatkan throughput, tetapi dibatasi oleh
faktor seperti jumlah partisi topik Kafka.
Membuat konektor Sumber MirrorMaker 2.0
Sebelum membuat konektor, tinjau dokumentasi untuk properti konektor.
Konsol
Di konsol Google Cloud , buka halaman Connect Clusters.
Klik cluster Connect tempat Anda ingin membuat konektor.
Halaman Connect cluster details akan ditampilkan.
Klik Buat Konektor.
Halaman Create Kafka Connector akan ditampilkan.
Untuk Nama konektor, masukkan string.
Untuk mengetahui informasi selengkapnya tentang cara memberi nama konektor, lihat Pedoman untuk memberi nama resource Managed Service for Apache Kafka.
Untuk Connector plugin, pilih "MirrorMaker 2.0 Source".
Untuk Primary Kafka cluster, pilih salah satu opsi berikut:
- Gunakan cluster Kafka utama sebagai cluster sumber: Untuk memindahkan data dari cluster Managed Service for Apache Kafka.
- Gunakan cluster Kafka utama sebagai cluster target: Untuk memindahkan data ke cluster Managed Service for Apache Kafka.
Untuk Target cluster atau Source cluster, pilih salah satu opsi berikut:
- Managed Service for Apache Kafka Cluster: Pilih dari menu.
- Cluster Kafka yang Dikelola Sendiri atau Eksternal: Masukkan alamat bootstrap dalam format
hostname:port_number.
Masukkan Nama topik atau regex topik yang dipisahkan koma.
Tinjau dan sesuaikan Konfigurasi, termasuk setelan keamanan yang diperlukan.
Untuk mengetahui informasi selengkapnya tentang konfigurasi dan autentikasi, lihat Konfigurasi.
Pilih Kebijakan mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Kebijakan memulai ulang tugas.
Klik Create.
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Jalankan perintah
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILEGanti kode berikut:
CONNECTOR_ID: ID atau nama konektor. Untuk mengetahui panduan tentang cara memberi nama konektor, lihat Panduan untuk memberi nama resource Managed Service untuk Apache Kafka. Nama konektor tidak dapat diubah.
LOCATION: Lokasi tempat Anda membuat konektor. Lokasi ini harus sama dengan lokasi tempat Anda membuat Connect cluster.
CONNECT_CLUSTER_ID: ID cluster Connect tempat konektor dibuat.
CONFIG_FILE: Jalur ke file konfigurasi YAML untuk konektor BigQuery Sink.
Berikut adalah contoh file konfigurasi untuk konektor Sumber MirrorMaker 2.0:
connector.class: "org.apache.kafka.connect.mirror.MirrorSourceConnector" name: "MM2_CONNECTOR_ID" source.cluster.alias: "source" target.cluster.alias: "target" topics: "GMK_TOPIC_NAME" source.cluster.bootstrap.servers: "GMK_SOURCE_CLUSTER_DNS" target.cluster.bootstrap.servers: "GMK_TARGET_CLUSTER_DNS" offset-syncs.topic.replication.factor: "1" source.cluster.security.protocol: "SASL_SSL" source.cluster.sasl.mechanism: "OAUTHBEARER" source.cluster.sasl.login.callback.handler.class: com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler source.cluster.sasl.jaas.config: org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; target.cluster.security.protocol: "SASL_SSL" target.cluster.sasl.mechanism: "OAUTHBEARER" target.cluster.sasl.login.callback.handler.class: "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler" target.cluster.sasl.jaas.config: "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
Terraform
Anda dapat menggunakan resource Terraform untuk membuat konektor.
Untuk mempelajari cara menerapkan atau menghapus konfigurasi Terraform, lihat Perintah dasar Terraform.
Go
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Go API.
Untuk melakukan autentikasi ke Managed Service untuk Apache Kafka, siapkan Kredensial Default Aplikasi(ADC). Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Java
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Java API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.
Python
Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di Menginstal library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi Managed Service for Apache Kafka Python API.
Untuk melakukan autentikasi ke Managed Service for Apache Kafka, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan ADC untuk lingkungan pengembangan lokal.