Kafka 是開放原始碼的分散式發布/訂閱訊息傳遞系統,可處理大量、高輸送量和即時串流資料。您可以使用 Kafka 建構串流資料管道,在不同系統和應用程式之間穩定移動資料,以進行處理和分析。
本教學課程的適用對象為平台管理員、雲端架構師和營運專員,他們有興趣在 Google Kubernetes Engine (GKE) 上部署高可用性的 Kafka 叢集。
建立叢集基礎架構
在本節中,您將執行 Terraform 指令碼,建立兩個地區 GKE 叢集。主要叢集會部署在 us-central1
中。
如要建立叢集,請按照下列步驟操作:
Autopilot
在 Cloud Shell 中執行下列指令:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
系統顯示提示訊息時,請輸入 yes
。
標準
在 Cloud Shell 中執行下列指令:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
系統顯示提示訊息時,請輸入 yes
。
Terraform 設定檔會建立下列資源,以部署基礎架構:
- 建立 Artifact Registry 存放區,用於儲存 Docker 映像檔。
- 為 VM 的網路介面建立虛擬私有雲網路和子網路。
- 建立兩個 GKE 叢集。
Terraform 會在這兩個區域中建立私人叢集,並啟用 Backup for GKE 進行災難復原。
在叢集上部署 Kafka
在本節中,您將使用 Helm 資訊套件在 GKE 上部署 Kafka。這項作業會建立下列資源:
- Kafka 和 Zookeeper StatefulSet。
- Kafka 匯出工具部署作業。匯出工具會收集 Kafka 指標,供 Prometheus 使用。
- Pod 中斷預算 (PDB): 限制自願中斷期間離線的 Pod 數量。
如要使用 Helm 圖表部署 Kafka,請按照下列步驟操作:
設定 Docker 存取權。
gcloud auth configure-docker us-docker.pkg.dev
在 Artifact Registry 中填入 Kafka 和 Zookeeper 映像檔。
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
設定主要叢集的
kubectl
指令列存取權。gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}
建立命名空間。
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
使用 Helm 資訊套件 20.0.6 版安裝 Kafka。
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
輸出結果會與下列內容相似:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
確認 Kafka 副本正在執行 (可能需要幾分鐘)。
kubectl get all -n kafka
輸出結果會與下列內容相似:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
建立測試資料
在本節中,您將測試 Kafka 應用程式並產生訊息。
建立消費者端 Pod,與 Kafka 應用程式互動。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
建立名為
topic1
的主題,其中包含三個分割區,以及三個複寫因數。kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
確認主題分區已在所有三個代理程式中複製。
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出結果會與下列內容相似:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
在範例輸出中,請注意
topic1
有三個分割區,每個分割區都有不同的領導者和副本集。這是因為 Kafka 會使用分割功能,將資料分配到多個代理程式,進而提升擴充性和容錯能力。複製係數為 3 可確保每個分割區都有三個副本,即使一或兩個代理程式發生故障,資料仍可供使用。執行下列指令,將大量訊息號碼產生至
topic1
。ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
執行下列指令,從所有分割區取用
topic1
。kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
輸入
CTRL+C
即可停止消費者程序。
Kafka 基準
如要準確模擬用途,您可以對叢集預期負載執行模擬作業。如要測試效能,請使用 Kafka 套件隨附的工具,也就是 bin
資料夾中的 kafka-producer-perf-test.sh
和 kafka-consumer-perf-test.sh
指令碼。
建立基準化主題。
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
在 Kafka 叢集上建立負載。
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
供應商會在
topic-benchmark
產生 10,000,000 筆記錄。輸出內容會類似以下內容:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
所有記錄都傳送完畢後,輸出內容中應會顯示其他指標,如下所示:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
如要退出手錶,請輸入
CTRL + C
。結束 Pod 殼層。
exit
管理升級
Kafka 和 Kubernetes 的版本更新會定期發布。請遵循作業最佳做法,定期升級軟體環境。
規劃 Kafka 二進位檔升級作業
在本節中,您將使用 Helm 更新 Kafka 映像檔,並確認主題仍可使用。
如要從「在叢集上部署 Kafka」一文中使用的 Helm 圖表升級較舊的 Kafka 版本,請按照下列步驟操作:
使用下列映像檔填入 Artifact Registry:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
請按照下列步驟,使用升級後的 Kafka 和 Zookeeper 映像檔部署 Helm 資訊套件。如需特定版本的指南,請參閱 Kafka 版本升級說明。
- 更新
Chart.yaml
依附元件版本:
../scripts/chart.sh kafka 20.1.0
使用新的 Kafka 和 Zookeeper 映像檔部署 Helm 資訊套件,如下列範例所示:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
觀看 Kafka Pod 升級:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
如要退出手錶,請輸入
CTRL + C
。- 更新
使用用戶端 Pod 連線至 Kafka 叢集。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
確認你可以存取
topic1
的訊息。kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出內容應顯示上一個步驟產生的訊息。輸入
CTRL+C
即可結束程序。結束用戶端 Pod。
exit
為災難復原做好準備
為確保生產環境工作負載在服務中斷事件發生時仍可使用,您應準備災難復原 (DR) 計畫。如要進一步瞭解 DR 規劃,請參閱「災難復原規劃指南」。
如要備份及還原 GKE 叢集中的工作負載,可以使用 GKE 備份服務。
Kafka 備份和還原情境範例
在本節中,您將從 gke-kafka-us-central1
備份叢集,並將備份還原至 gke-kafka-us-west1
。您將使用 ProtectedApplication
自訂資源,在應用程式範圍執行備份和還原作業。
下圖說明災難復原解決方案的元件,以及這些元件之間的關聯。
如要準備備份及還原 Kafka 叢集,請按照下列步驟操作:
設定環境變數。
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
確認叢集處於
RUNNING
狀態。gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
建立備份方案。
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
手動建立備份。排定的備份作業通常會依據備份方案中的 cron 排程執行,但以下範例說明如何啟動一次性備份作業。
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
建立還原方案。
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
手動從備份還原。
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
在備份叢集中查看還原的應用程式。所有 Pod 可能需要幾分鐘才能執行並準備就緒。
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watch
當所有 Pod 啟動並執行後,輸入
CTRL+C
即可結束監控。驗證消費者是否可擷取先前的主題。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
輸出結果會與下列內容相似:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
輸入
CTRL+C
即可結束程序。離開 Pod。
exit
模擬 Kafka 服務中斷
在本節中,您將替換代管代理程式的 Kubernetes 節點,模擬節點故障。本節內容僅適用於標準版。 Autopilot 會為您管理節點,因此無法模擬節點故障。
建立用戶端 Pod,連線至 Kafka 應用程式。
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
建立主題
topic-failover-test
並產生測試流量。kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
判斷
topic-failover-test
主題的領導者代理程式。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出結果會與下列內容相似:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
在上述輸出內容中,
Leader: 1
表示topic-failover-test
的領導者是代理程式 1。這對應於 Podkafka-1
。開啟新的終端機,並連線至同一個叢集。
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
找出 Pod
kafka-1
執行的節點。kubectl get pod -n kafka kafka-1 -o wide
輸出結果會與下列內容相似:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
在上述輸出內容中,您會看到 Pod
kafka-1
在節點gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
上執行。排空節點以逐出 Pod。
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
將 NODE 替換為執行 kafka-1 的節點 Pod。在本例中,節點為
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
。輸出結果會與下列內容相似:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
找出 Pod
kafka-1
執行的節點。kubectl get pod -n kafka kafka-1 -o wide
輸出內容應如下所示:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
從上述輸出內容中,您可以看到應用程式是在新節點上執行。
在連線至
kafka-client
Pod 的終端機中,判斷哪個代理程式是topic-failover-test
的領導者。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出內容應如下所示:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
在範例輸出內容中,領導者仍為 1。不過,現在是在新節點上執行。
測試 Kafka 領導者故障
在 Cloud Shell 中連線至 Kafka 用戶端,然後使用
describe
查看topic1
中每個分區選出的領導者。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出結果會與下列內容相似:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
在未連線至 Kafka 用戶端的 Cloud Shell 中,刪除
kafka-0
領導者代理程式,強制進行新的領導者選舉。您應刪除對應至先前輸出內容中其中一位領導者的索引。kubectl delete pod -n kafka kafka-0 --force
輸出結果會與下列內容相似:
pod "kafka-0" force deleted
在連線至 Kafka 用戶端的 Cloud Shell 中,使用
describe
查看選出的領導者。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
輸出結果會與下列內容相似:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
在輸出內容中,每個分割區的新領導者都會變更 (如果已指派給中斷的領導者 (
kafka-0
))。這表示系統在刪除並重新建立 Pod 時,已取代原始領導者。