Kafka adalah sistem pesan open source yang terdistribusi untuk menangani data streaming real-time, throughput tinggi, dan volume tinggi. Anda dapat menggunakan Kafka untuk membangun pipeline data streaming yang memindahkan data dengan andal ke berbagai sistem dan aplikasi untuk pemrosesan serta analisis.
Tutorial ini ditujukan untuk administrator platform, arsitek cloud, dan tenaga profesional operasi yang tertarik untuk men-deploy cluster Kafka yang sangat tersedia di Google Kubernetes Engine (GKE).
Membuat infrastruktur cluster
Di bagian ini, Anda akan menjalankan skrip Terraform untuk membuat dua cluster GKE regional.
Cluster utama akan di-deploy di us-central1.
Untuk membuat cluster, ikuti langkah-langkah berikut:
Autopilot
Di Cloud Shell jalankan perintah berikut:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Saat diminta, ketik yes.
Standard
Di Cloud Shell jalankan perintah berikut:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Saat diminta, ketik yes.
File konfigurasi Terraform membuat resource berikut untuk men-deploy infrastruktur Anda:
- Membuat repositori Artifact Registry untuk menyimpan image Docker.
- Membuat jaringan VPC dan subnet untuk antarmuka jaringan VM.
- Buat dua cluster GKE.
Terraform membuat cluster pribadi di dua region, dan mengaktifkan Pencadangan untuk GKE demi pemulihan dari bencana (disaster recovery).
Men-deploy Kafka di cluster Anda
Di bagian ini, Anda akan men-deploy Kafka di GKE menggunakan chart Helm. Operasi ini akan membuat resource berikut:
- StatefulSets Kafka dan Zookeeper.
- Deployment pengekspor Kafka. Pengekspor mengumpulkan metrik Kafka untuk pemakaian Prometheus.
- Anggaran Gangguan Pod (PDB) yang membatasi jumlah Pod offline selama gangguan sukarela.
Agar dapat menggunakan chart Helm untuk men-deploy Kafka, ikuti langkah-langkah berikut:
Konfigurasi akses Docker.
gcloud auth configure-docker us-docker.pkg.devIsi Artifact Registry dengan image Kafka dan Zookeeper.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74Konfigurasi akses command line
kubectlke cluster utama.gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}Membuat namespace.
export NAMESPACE=kafka kubectl create namespace $NAMESPACEInstal Kafka menggunakan versi chart Helm 20.0.6.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"Outputnya mirip dengan hal berikut ini:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: NonePastikan replika Kafka Anda berjalan (proses ini mungkin memerlukan waktu beberapa menit).
kubectl get all -n kafkaOutputnya mirip dengan yang berikut ini:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Membuat data pengujian
Di bagian ini, Anda akan menguji aplikasi Kafka dan membuat pesan.
Buat Pod klien konsumen untuk berinteraksi dengan aplikasi Kafka.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bashBuat topik bernama
topic1dengan tiga partisi dan faktor replikasi tiga.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Pastikan bahwa partisi topik direplikasi di ketiga broker.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Outputnya mirip dengan yang berikut ini:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2Dalam contoh output, perhatikan bahwa
topic1memiliki tiga partisi, masing-masing dengan pemimpin dan kumpulan replika yang berbeda. Hal ini karena Kafka menggunakan partisi untuk mendistribusikan data ke beberapa broker, sehingga memungkinkan skalabilitas dan fault tolerance yang lebih besar. Faktor replikasi tiga memastikan bahwa setiap partisi memiliki tiga replika, sehingga data masih tersedia meskipun satu atau dua broker gagal.Jalankan perintah berikut untuk membuat nomor pesan secara massal ke dalam
topic1.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"Jalankan perintah berikut untuk menggunakan
topic1dari semua partisi.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;Ketik
CTRL+Cuntuk menghentikan proses penggunaan.
Tolok Ukur Kafka
Untuk membuat model kasus penggunaan secara akurat, Anda dapat menjalankan simulasi ekspektasi pemuatan di cluster. Untuk menguji performa, Anda akan menggunakan alat yang disertakan dalam paket Kafka, yaitu skrip kafka-producer-perf-test.sh dan kafka-consumer-perf-test.sh dalam folder bin.
Buat topik untuk benchmark.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Buat pemuatan di cluster Kafka.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metricsProduser akan membuat 10.000.000 kumpulan data pada
topic-benchmark. Outputnya mirip dengan yang berikut ini:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.Setelah semua data dikirim, Anda akan melihat metrik tambahan dalam output, mirip dengan yang berikut ini:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000Untuk keluar dari smartwatch, ketik
CTRL + C.Keluar dari shell Pod.
exit
Mengelola upgrade
Update versi untuk Kafka dan Kubernetes dirilis secara rutin. Ikuti praktik terbaik operasional untuk mengupgrade lingkungan software Anda secara berkala.
Paket upgrade biner Kafka
Di bagian ini, Anda akan memperbarui image Kafka menggunakan Helm dan memverifikasi bahwa topik Anda masih tersedia.
Untuk mengupgrade dari versi Kafka sebelumnya dari chart Helm yang Anda gunakan dalam Men-deploy Kafka di cluster, ikuti langkah-langkah berikut:
Isi Artifact Registry dengan image berikut:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0Lakukan langkah-langkah berikut untuk men-deploy chart Helm dengan gambar Kafka dan Zookeeper yang telah diupgrade. Untuk panduan khusus versi, lihat petunjuk Kafka untuk upgrade versi.
- Perbarui versi dependensi
Chart.yaml:
../scripts/chart.sh kafka 20.1.0Deploy chart Helm dengan image Kafka dan Zookeeper baru, seperti yang ditunjukkan pada contoh berikut:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Tonton Kafka Pods diupgrade:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watchUntuk keluar dari smartwatch, ketik
CTRL + C.- Perbarui versi dependensi
Terhubung ke cluster Kafka menggunakan Pod klien.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bashVerifikasi bahwa Anda dapat mengakses pesan dari
topic1.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Output akan menampilkan pesan yang dihasilkan dari langkah sebelumnya. Ketik
CTRL+Cuntuk keluar dari proses.Keluar dari Pod klien.
exit
Bersiap untuk pemulihan dari bencana (disaster recovery)
Untuk memastikan workload produksi tetap tersedia jika terjadi peristiwa yang mengganggu layanan, Anda harus menyiapkan rencana pemulihan dari bencana (disaster recovery) (DR). Untuk mempelajari lebih lanjut perencanaan DR, lihat Panduan perencanaan pemulihan dari bencana (disaster recovery).
Untuk mencadangkan dan memulihkan workload di cluster GKE, Anda dapat menggunakan Pencadangan untuk GKE.
Contoh skenario pencadangan dan pemulihan Kafka
Di bagian ini, Anda akan mencadangkan cluster dari gke-kafka-us-central1 dan memulihkan cadangan tersebut ke gke-kafka-us-west1. Anda akan melakukan operasi pencadangan dan pemulihan pada cakupan aplikasi, menggunakan Resource Kustom ProtectedApplication.
Diagram berikut menggambarkan komponen solusi pemulihan dari bencana (disaster recovery), dan hubungannya satu sama lain.
Sebagai persiapan untuk mencadangkan dan memulihkan cluster Kafka Anda, ikuti langkah-langkah berikut:
Siapkan variabel lingkungan.
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGIONVerifikasi bahwa cluster berada dalam status
RUNNING.gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'Buat Rencana Cadangan.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0Buat Cadangan secara manual. Meskipun pencadangan terjadwal biasanya diatur oleh jadwal cron dalam rencana pencadangan, contoh berikut menunjukkan cara memulai operasi pencadangan satu kali.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completionBuat Rencana Pemulihan.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"Lakukan pemulihan secara manual dari Cadangan.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAMELihat aplikasi yang dipulihkan muncul di cluster cadangan. Mungkin perlu waktu beberapa menit sampai semua Pod berjalan dan siap.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watchKetik
CTRL+Cuntuk keluar dari smartwatch saat semua Pod sudah aktif dan berjalan.Validasi bahwa topik sebelumnya dapat diambil oleh konsumen.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bashkafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;Outputnya mirip dengan yang berikut ini:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messagesKetik
CTRL+Cuntuk keluar dari proses.Keluar dari Pod.
exit
Menyimulasikan gangguan layanan Kafka
Di bagian ini, Anda akan menyimulasikan kegagalan node dengan mengganti node Kubernetes yang menghosting broker. Bagian ini hanya berlaku untuk Standard. Autopilot mengelola node untuk Anda, sehingga kegagalan node tidak dapat disimulasikan.
Buat pod klien untuk terhubung ke aplikasi Kafka.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bashBuat topik
topic-failover-testdan hasilkan traffic pengujian.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Tentukan broker mana yang merupakan pemimpin untuk topik
topic-failover-test.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Outputnya mirip dengan yang berikut ini:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2Pada output di atas,
Leader: 1berarti pemimpin untuktopic-failover-testadalah broker 1. Ini sesuai dengan Podkafka-1.Buka terminal baru dan hubungkan ke cluster yang sama.
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_IDTemukan Pod node
kafka-1yang berjalan.kubectl get pod -n kafka kafka-1 -o wideOutputnya mirip dengan yang berikut ini:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>Pada output di atas, Anda melihat Pod
kafka-1berjalan pada nodegke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.Kosongi node untuk mengeluarkan Pod.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsetsGanti NODE dengan pod node kafka-1 yang sedang berjalan. Dalam contoh ini, node-nya adalah
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.Outputnya mirip dengan yang berikut ini:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drainedTemukan Pod node
kafka-1yang berjalan.kubectl get pod -n kafka kafka-1 -o wideOutputnya akan mirip dengan berikut ini:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>Dari output di atas, Anda akan melihat aplikasi berjalan di node baru.
Di terminal yang terhubung ke Pod
kafka-client, tentukan broker yang merupakan pemimpin untuktopic-failover-test.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Outputnya akan mirip dengan berikut ini:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1Dalam contoh output, pemimpin masih 1. Namun, sekarang layanan tersebut berjalan di node baru.
Menguji kegagalan pemimpin Kafka
Di Cloud Shell, hubungkan ke klien Kafka, dan gunakan
describeuntuk melihat pemimpin yang dipilih untuk setiap partisi ditopic1.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Outputnya mirip dengan yang berikut ini:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1Di Cloud Shell yang tidak terhubung ke klien Kafka, hapus broker pemimpin
kafka-0untuk memaksakan pemilihan pemimpin baru. Anda harus menghapus indeks yang dipetakan ke salah satu pemimpin di output sebelumnya.kubectl delete pod -n kafka kafka-0 --forceOutputnya mirip dengan yang berikut ini:
pod "kafka-0" force deletedDi Cloud Shell yang terhubung ke klien Kafka, gunakan
describeuntuk melihat pemimpin yang dipilih.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092Outputnya mirip dengan yang berikut ini:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1Di output tersebut, pemimpin baru untuk setiap partisi akan berubah, jika pemimpin itu ditetapkan ke pemimpin yang diinterupsi (
kafka-0). Hal ini menunjukkan bahwa pemimpin asli diganti saat Pod dihapus dan dibuat ulang.