Template Apache Kafka ke Apache Kafka membuat pipeline streaming yang menyerap data sebagai byte dari sumber Apache Kafka, lalu menulis byte ke sink Apache Kafka.
Persyaratan pipeline
- Topik sumber Apache Kafka harus ada.
- Server broker sink dan sumber Apache Kafka harus berjalan dan dapat dijangkau dari mesin worker Dataflow.
- Jika Anda menggunakan Layanan Terkelola Google Cloud untuk Apache Kafka sebagai sumber atau sink, topik harus ada sebelum meluncurkan template.
Format pesan Kafka
Pesan sumber Apache Kafka dibaca sebagai byte, dan byte ditulis ke sink Apache Kafka.
Autentikasi
Template Apache Kafka ke Apache Kafka mendukung autentikasi SASL/PLAIN dan TLS ke broker Kafka.
Parameter template
Parameter yang diperlukan
- readBootstrapServerAndTopic: Server dan topik Bootstrap Kafka untuk membaca input. Contoh,
localhost:9092;topic1,topic2. - kafkaReadAuthenticationMode: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan
KafkaAuthenticationMethod.NONEuntuk tanpa autentikasi,KafkaAuthenticationMethod.SASL_PLAINuntuk nama pengguna dan sandi SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512untuk autentikasi SASL_SCRAM_512, danKafkaAuthenticationMethod.TLSuntuk autentikasi berbasis sertifikat.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALShanya boleh digunakan untuk cluster Google Cloud Apache Kafka untuk BigQuery, yang memungkinkan autentikasi menggunakan kredensial default aplikasi. - writeBootstrapServerAndTopic: Topik Kafka yang menjadi tujuan penulisan output.
- kafkaWriteAuthenticationMethod: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan NONE untuk tanpa autentikasi, SASL_PLAIN untuk nama pengguna dan sandi SASL/PLAIN, SASL_SCRAM_512 untuk autentikasi berbasis SASL_SCRAM_512, dan TLS untuk autentikasi berbasis sertifikat. Nilai defaultnya: APPLICATION_DEFAULT_CREDENTIALS.
Parameter opsional
- enableCommitOffsets: Commit offset pesan yang diproses ke Kafka. Jika diaktifkan, fitur ini akan meminimalkan kehilangan atau pemrosesan duplikat pesan saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Nilai defaultnya: salah.
- consumerGroupId: ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offset ke Kafka diaktifkan. Nilai defaultnya adalah kosong.
- kafkaReadOffset: Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Yang paling awal dimulai dari awal, yang terbaru dimulai dari pesan terbaru. Nilai defaultnya: terbaru.
- kafkaReadUsernameSecretId: ID secret dari Secret Manager Google Cloud yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_PLAIN. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaReadPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_PLAIN. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaReadKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. Contoh,
gs://your-bucket/keystore.jks. - kafkaReadTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya untuk digunakan dalam memverifikasi identitas broker Kafka.
- kafkaReadTruststorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadKeystorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadKeyPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramUsernameSecretId: ID secret dari Secret Manager Google Cloud yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_SCRAM. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_SCRAM. Contoh,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaReadSaslScramTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi SASL_SCRAM Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaWriteUsernameSecretId: ID secret dari Secret Manager Google Cloud yang berisi nama pengguna Kafka untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaWritePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi Kafka yang akan digunakan untuk autentikasi SASL_PLAIN dengan cluster Kafka tujuan. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. Nilai defaultnya adalah kosong. - kafkaWriteKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi untuk mengautentikasi dengan cluster Kafka tujuan. Contoh,
gs://<BUCKET>/<KEYSTORE>.jks. - kafkaWriteTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya untuk digunakan dalam memverifikasi identitas broker Kafka tujuan.
- kafkaWriteTruststorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaWriteKeystorePasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi untuk mengakses file Java KeyStore (JKS) yang akan digunakan untuk autentikasi TLS dengan cluster Kafka tujuan. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. - kafkaWriteKeyPasswordSecretId: ID secret dari Secret Manager Google Cloud yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS dengan cluster Kafka tujuan. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>.
Menjalankan template
Konsol
- Buka halaman Dataflow Membuat tugas dari template. Buka Membuat tugas dari template
- Di kolom Nama tugas, masukkan nama tugas yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah
us-central1.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Template Dataflow, pilih the Kafka to Cloud Storage template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming setidaknya satu kali, pilih Setidaknya Satu Kali.
- Klik Jalankan tugas.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Kafka \ --parameters \ readBootstrapServerAndTopic=READ_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ writeBootstrapServerAndTopic=WRITE_BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaWriteAuthenticationMethod=APPLICATION_DEFAULT_CREDENTIALS
Ganti kode berikut:
PROJECT_ID: Google Cloud Project ID tempat Anda ingin menjalankan tugas DataflowJOB_NAME: nama tugas unik pilihan AndaREGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1VERSION: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latestuntuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00, untuk menggunakan template versi tertentu, yang dapat ditemukan bertingkat di masing-masing folder induk yang diberi tanggal dalam bucket—gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC: alamat server bootstrap Apache Kafka dan topik yang akan dibacaWRITE_BOOTSTRAP_SERVER_AND_TOPIC: alamat server bootstrap Apache Kafka dan topik yang akan ditulisiFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME - Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
API
Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "READ_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "writeBootstrapServerAndTopic": "WRITE_BOOTSTRAP_SERVER_AND_TOPIC", "kafkaWriteAuthenticationMethod": "APPLICATION_DEFAULT_CREDENTIALS }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Kafka", } }
Ganti kode berikut:
PROJECT_ID: Google Cloud Project ID tempat Anda ingin menjalankan tugas DataflowJOB_NAME: nama tugas unik pilihan AndaLOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1VERSION: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latestuntuk menggunakan template versi terbaru, yang tersedia di folder induk tanpa tanggal di bucket—gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00, untuk menggunakan template versi tertentu, yang dapat ditemukan bertingkat di masing-masing folder induk yang diberi tanggal dalam bucket—gs://dataflow-templates-REGION_NAME/
READ_BOOTSTRAP_SERVER_AND_TOPIC: alamat server bootstrap Apache Kafka dan topik yang akan dibacaWRITE_BOOTSTRAP_SERVER_AND_TOPIC: alamat server bootstrap Apache Kafka dan topik yang akan ditulisiFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME - Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
Langkah berikutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.