在 GKE 上部署高可用性 Kafka 集群

Kafka 是一个开源分布式发布-订阅消息传递系统,用于处理大量、高吞吐量和实时的流式数据。您可以使用 Kafka 构建流式数据流水线,以在不同系统和应用之间可靠地移动数据来进行处理和分析。

本教程适用于想要在 Google Kubernetes Engine (GKE) 上部署高可用性 Kafka 集群的平台管理员、云架构师和运维专业人员。

创建集群基础架构

在本部分中,您将运行 Terraform 脚本来创建两个区域级 GKE 集群。主要集群将部署在 us-central1 中。

如需创建集群,请运行以下命令:

Autopilot

在 Cloud Shell 中,运行以下命令:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

出现提示时,请输入 yes

Standard

在 Cloud Shell 中,运行以下命令:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

出现提示时,请输入 yes

Terraform 配置文件会创建以下资源来部署基础架构:

  • 创建 Artifact Registry 制品库以存储 Docker 映像。
  • 为虚拟机的网络接口创建 VPC 网络和子网。
  • 创建两个 GKE 集群。

Terraform 会在两个区域中创建专用集群,并启用 Backup for GKE 以用于灾难恢复。

在集群上部署 Kafka

在本部分中,您将使用 Helm 图表在 GKE 上部署 Kafka。该操作会创建以下资源:

  • Kafka 和 Zookeeper StatefulSet。
  • Kafka 导出器部署。导出器会收集 Kafka 指标以供 Prometheus 使用。
  • Pod 中断预算 (PDB),用于限制在主动中断期间离线 Pod 的数量。

如需使用 Helm 图表部署 Kafka,请按照以下步骤操作:

  1. 配置 Docker 访问权限。

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. 使用 Kafka 和 Zookeeper 映像填充 Artifact Registry。

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. 配置对主要集群的 kubectl 命令行访问权限。

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. 创建命名空间。

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. 使用 Helm 图表 20.0.6 版安装 Kafka。

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    输出类似于以下内容:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. 验证 Kafka 副本是否正在运行(这可能需要几分钟的时间)。

    kubectl get all -n kafka
    

    输出内容类似如下:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

创建测试数据

在本部分中,您将测试 Kafka 应用并生成消息。

  1. 创建与 Kafka 应用进行交互的使用方客户端 Pod。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. 创建一个名为 topic1 的主题,其包含三个分区且复制因子为 3。

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 验证是否已在所有三个代理中复制主题分区。

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    在示例输出中,请注意 topic1 有三个分区,每个分区都有不同的主要代理和副本集。这是因为 Kafka 使用分区将数据分布到多个代理,从而实现更高的可伸缩性和容错能力。复制因子为 3 可确保每个分区都有三个副本,因此,即使一个或两个代理发生故障,数据也仍然可用。

  4. 运行以下命令以将消息编号批量生成到 topic1 中。

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. 运行以下命令以在所有分区中使用 topic1

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    CTRL+C 以停止使用方进程。

对 Kafka 进行基准化分析

如需准确为用例建模,您可以对集群上的预期负载运行模拟。如需测试性能,您需要使用 Kafka 软件包中包含的工具,即 bin 文件夹中的 kafka-producer-perf-test.shkafka-consumer-perf-test.sh 脚本。

  1. 创建一个用于基准测试的主题。

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. 在 Kafka 集群上创建负载。

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    提供方会为 topic-benchmark 生成 1000 万条记录。输出内容类似如下:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    所有记录都发送后,您应该会在输出中看到其他指标,如下所示:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    如需退出监控,请按 CTRL + C

  3. 退出 Pod shell。

    exit
    

管理升级

Kafka 和 Kubernetes 的版本更新会定期发布。请遵循运营最佳实践来定期更新您的软件环境。

规划 Kafka 二进制文件升级

在本部分中,您将使用 Helm 更新 Kafka 映像,并验证主题是否仍然可用。

如需通过在集群上部署 Kafka 中使用的 Helm 图表从早期 Kafka 版本进行升级,请按照以下步骤操作:

  1. 使用以下映像填充 Artifact Registry:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. 执行以下步骤以使用升级后的 Kafka 和 ZooKeeper 映像部署 Helm 图表。如需了解特定于版本的指导信息,请参阅有关版本升级的 Kafka 说明

    1. 更新 Chart.yaml 依赖项版本:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. 使用新的 Kafka 和 Zookeeper 映像部署 Helm 图表,如以下示例所示:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    监控 Kafka Pod 是否升级:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    如需退出监控,请按 CTRL + C

  3. 使用客户端 Pod 连接到 Kafka 集群。

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. 验证您是否可以访问来自 topic1 的消息。

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出应显示上一步中生成的消息。按 CTRL+C 以退出进程。

  5. 退出客户端 pod。

    exit
    

为灾难恢复做好准备

为确保生产工作负载在发生服务中断事件时仍然可用,您应该准备灾难恢复 (DR) 规划。如需详细了解灾难恢复规划,请参阅灾难恢复规划指南

如需在 GKE 集群上备份和恢复工作负载,您可以使用 Backup for GKE

Kafka 备份和恢复场景示例

在本部分中,您将从 gke-kafka-us-central1 备份集群,并将备份恢复到 gke-kafka-us-west1。您将使用 ProtectedApplication 自定义资源在应用范围内执行备份和恢复操作。

下图展示了灾难恢复解决方案的组件,以及相互之间的关系。

图表展示了高可用性 PostgreSQL 集群的备份和恢复解决方案示例。
图 3:高可用性 Kafka 集群的备份和恢复解决方案示例。

如需准备备份和恢复 Kafka 集群,请按照以下步骤操作:

  1. 设置环境变量。

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. 验证集群是否处于 RUNNING 状态。

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. 创建备份方案。

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. 手动创建备份。虽然计划备份通常受备份方案中的 cron-schedule 约束,但以下示例展示了如何启动一次性备份操作。

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. 创建恢复方案。

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. 通过备份手动进行恢复。

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. 监控恢复的应用是否出现在备份集群中。所有 Pod 可能需要几分钟时间才能运行并准备就绪。

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    CTRL+C 以在所有 Pod 已启动并运行时退出监控。

  8. 验证使用方是否可以提取之前的主题。

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    输出内容类似如下:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    CTRL+C 以退出进程。

  9. 退出 Pod。

    exit
    

模拟 Kafka 服务中断

在本部分中,您将通过替换托管代理的 Kubernetes 节点来模拟节点故障。本部分仅适用于 Standard。Autopilot 会为您管理节点,因此无法模拟节点故障。

  1. 创建客户端 Pod 以连接到 Kafka 应用。

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. 创建主题 topic-failover-test 并生成测试流量。

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. 确定哪个代理是 topic-failover-test 主题的主要代理。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    在上面的输出中,Leader: 1 表示 topic-failover-test 的主要代理是代理 1。此代理对应于 Pod kafka-1

  4. 打开新终端并连接到同一集群。

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. 查找 Pod kafka-1 在哪个节点上运行。

    kubectl get pod -n kafka kafka-1 -o wide
    

    输出内容类似如下:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    在上面的输出中,您可以看到 Pod kafka-1 在节点 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 上运行。

  6. 排空该节点以逐出 Pod。

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    NODE 替换为 Pod kafka-1 在其上运行的节点。在此示例中,该节点为 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72

    输出内容类似如下:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. 查找 Pod kafka-1 在哪个节点上运行。

    kubectl get pod -n kafka kafka-1 -o wide
    

    输出应类似如下所示:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    从上面的输出中,您可以看到应用在新节点上运行。

  8. 在连接到 kafka-client Pod 的终端中,确定哪个代理是 topic-failover-test 的主要代理。

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出应类似如下所示:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    在示例输出中,主要代理仍为 1。不过,它当前在新的节点上运行。

测试 Kafka 主要代理故障

  1. 在 Cloud Shell 中,连接到 Kafka 客户端,然后使用 describe 查看为 topic1 中的每个分区选举的主要代理。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. 在未连接到 Kafka 客户端的 Cloud Shell 中,删除 kafka-0 主要代理以强制选举新的主要代理。您应该删除映射到上述输出中的一个主要代理的索引。

    kubectl delete pod -n kafka kafka-0 --force
    

    输出内容类似如下:

    pod "kafka-0" force deleted
    
  3. 在连接到 Kafka 客户端的 Cloud Shell 中,使用 describe 查看选举的主要代理。

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    输出内容类似如下:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    在输出中,如果每个分区的新主要代理已分配给中断的主要代理 (kafka-0),则新主要代理会发生更改。这表示删除并重新创建 Pod 时,原始主要代理已被替换。