Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。
本教程适用于想要在 Google Kubernetes Engine (GKE) 上部署高可用性 Kafka 集群的平台管理员、云架构师和运维专业人员。
创建集群基础架构
在本部分中,您将运行 Terraform 脚本来创建两个区域级 GKE 集群。主要集群将部署在 us-central1
中。
如需创建集群,请运行以下命令:
Autopilot
在 Cloud Shell 中,运行以下命令:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
出现提示时,请输入 yes
。
Standard
在 Cloud Shell 中,运行以下命令:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
出现提示时,请输入 yes
。
Terraform 配置文件会创建以下资源来部署基础架构:
- 创建 Artifact Registry 制品库以存储 Docker 映像。
- 为虚拟机的网络接口创建 VPC 网络和子网。
- 创建两个 GKE 集群。
Terraform 会在两个区域中创建专用集群,并启用 Backup for GKE 以用于灾难恢复。
在集群上部署 Kafka
在本部分中,您将使用 Helm 图表在 GKE 上部署 Kafka。该操作会创建以下资源:
- Kafka 和 Zookeeper StatefulSet。
- Kafka 导出器部署。导出器会收集 Kafka 指标以供 Prometheus 使用。
- Pod 中断预算 (PDB),用于限制在主动中断期间离线 Pod 的数量。
如需使用 Helm 图表部署 Kafka,请按照以下步骤操作:
配置 Docker 访问权限。
gcloud auth configure-docker us-docker.pkg.dev
使用 Kafka 和 Zookeeper 映像填充 Artifact Registry。
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
配置对主要集群的
kubectl
命令行访问权限。gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}
创建命名空间。
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
使用 Helm 图表 20.0.6 版安装 Kafka。
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
输出类似于以下内容:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
验证 Kafka 副本是否正在运行(这可能需要几分钟的时间)。
kubectl get all -n kafka
输出内容类似如下:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
创建测试数据
在本部分中,您将测试 Kafka 应用并生成消息。
创建与 Kafka 应用进行交互的使用方客户端 Pod。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
创建一个名为
topic1
的主题,其包含三个分区且复制因子为 3。kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
验证是否已在所有三个代理中复制主题分区。
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
在示例输出中,请注意
topic1
有三个分区,每个分区都有不同的主要代理和副本集。这是因为 Kafka 使用分区将数据分布到多个代理,从而实现更高的可伸缩性和容错能力。复制因子为 3 可确保每个分区都有三个副本,因此,即使一个或两个代理发生故障,数据也仍然可用。运行以下命令以将消息编号批量生成到
topic1
中。ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
运行以下命令以在所有分区中使用
topic1
。kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
按
CTRL+C
以停止使用方进程。
对 Kafka 进行基准化分析
如需准确为用例建模,您可以对集群上的预期负载运行模拟。如需测试性能,您需要使用 Kafka 软件包中包含的工具,即 bin
文件夹中的 kafka-producer-perf-test.sh
和 kafka-consumer-perf-test.sh
脚本。
创建一个用于基准测试的主题。
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
在 Kafka 集群上创建负载。
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
提供方会为
topic-benchmark
生成 1000 万条记录。输出内容类似如下:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
所有记录都发送后,您应该会在输出中看到其他指标,如下所示:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
如需退出监控,请按
CTRL + C
。退出 Pod shell。
exit
管理升级
Kafka 和 Kubernetes 的版本更新会定期发布。请遵循运营最佳实践来定期更新您的软件环境。
规划 Kafka 二进制文件升级
在本部分中,您将使用 Helm 更新 Kafka 映像,并验证主题是否仍然可用。
如需通过在集群上部署 Kafka 中使用的 Helm 图表从早期 Kafka 版本进行升级,请按照以下步骤操作:
使用以下映像填充 Artifact Registry:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
执行以下步骤以使用升级后的 Kafka 和 ZooKeeper 映像部署 Helm 图表。如需了解特定于版本的指导信息,请参阅有关版本升级的 Kafka 说明。
- 更新
Chart.yaml
依赖项版本:
../scripts/chart.sh kafka 20.1.0
使用新的 Kafka 和 Zookeeper 映像部署 Helm 图表,如以下示例所示:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
监控 Kafka Pod 是否升级:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
如需退出监控,请按
CTRL + C
。- 更新
使用客户端 Pod 连接到 Kafka 集群。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
验证您是否可以访问来自
topic1
的消息。kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出应显示上一步中生成的消息。按
CTRL+C
以退出进程。退出客户端 pod。
exit
为灾难恢复做好准备
为确保生产工作负载在发生服务中断事件时仍然可用,您应该准备灾难恢复 (DR) 规划。如需详细了解灾难恢复规划,请参阅灾难恢复规划指南。
如需在 GKE 集群上备份和恢复工作负载,您可以使用 Backup for GKE。
Kafka 备份和恢复场景示例
在本部分中,您将从 gke-kafka-us-central1
备份集群,并将备份恢复到 gke-kafka-us-west1
。您将使用 ProtectedApplication
自定义资源在应用范围内执行备份和恢复操作。
下图展示了灾难恢复解决方案的组件,以及相互之间的关系。
如需准备备份和恢复 Kafka 集群,请按照以下步骤操作:
设置环境变量。
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
验证集群是否处于
RUNNING
状态。gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
创建备份方案。
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
手动创建备份。虽然计划备份通常受备份方案中的 cron-schedule 约束,但以下示例展示了如何启动一次性备份操作。
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
创建恢复方案。
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
通过备份手动进行恢复。
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
监控恢复的应用是否出现在备份集群中。所有 Pod 可能需要几分钟时间才能运行并准备就绪。
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watch
按
CTRL+C
以在所有 Pod 已启动并运行时退出监控。验证使用方是否可以提取之前的主题。
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
输出内容类似如下:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
按
CTRL+C
以退出进程。退出 Pod。
exit
模拟 Kafka 服务中断
在本部分中,您将通过替换托管代理的 Kubernetes 节点来模拟节点故障。本部分仅适用于 Standard。Autopilot 会为您管理节点,因此无法模拟节点故障。
创建客户端 Pod 以连接到 Kafka 应用。
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
创建主题
topic-failover-test
并生成测试流量。kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
确定哪个代理是
topic-failover-test
主题的主要代理。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
在上面的输出中,
Leader: 1
表示topic-failover-test
的主要代理是代理 1。此代理对应于 Podkafka-1
。打开新终端并连接到同一集群。
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
查找 Pod
kafka-1
在哪个节点上运行。kubectl get pod -n kafka kafka-1 -o wide
输出内容类似如下:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
在上面的输出中,您可以看到 Pod
kafka-1
在节点gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
上运行。排空该节点以逐出 Pod。
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
将 NODE 替换为 Pod kafka-1 在其上运行的节点。在此示例中,该节点为
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
。输出内容类似如下:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
查找 Pod
kafka-1
在哪个节点上运行。kubectl get pod -n kafka kafka-1 -o wide
输出应类似如下所示:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
从上面的输出中,您可以看到应用在新节点上运行。
在连接到
kafka-client
Pod 的终端中,确定哪个代理是topic-failover-test
的主要代理。kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出应类似如下所示:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
在示例输出中,主要代理仍为 1。不过,它当前在新的节点上运行。
测试 Kafka 主要代理故障
在 Cloud Shell 中,连接到 Kafka 客户端,然后使用
describe
查看为topic1
中的每个分区选举的主要代理。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
在未连接到 Kafka 客户端的 Cloud Shell 中,删除
kafka-0
主要代理以强制选举新的主要代理。您应该删除映射到上述输出中的一个主要代理的索引。kubectl delete pod -n kafka kafka-0 --force
输出内容类似如下:
pod "kafka-0" force deleted
在连接到 Kafka 客户端的 Cloud Shell 中,使用
describe
查看选举的主要代理。kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
输出内容类似如下:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
在输出中,如果每个分区的新主要代理已分配给中断的主要代理 (
kafka-0
),则新主要代理会发生更改。这表示删除并重新创建 Pod 时,原始主要代理已被替换。