Desplegar un clúster de Kafka de alta disponibilidad en GKE

Kafka es un sistema de mensajería de publicación y suscripción distribuido y de código abierto para gestionar datos de streaming en tiempo real, de alto volumen y de alto rendimiento. Puedes usar Kafka para crear flujos de procesamiento de datos que transfieran datos de forma fiable entre diferentes sistemas y aplicaciones para procesarlos y analizarlos.

Este tutorial está dirigido a administradores de plataformas, arquitectos de la nube y profesionales de operaciones que quieran desplegar clústeres de Kafka de alta disponibilidad en Google Kubernetes Engine (GKE).

Crear la infraestructura del clúster

En esta sección, ejecutarás una secuencia de comandos de Terraform para crear dos clústeres de GKE regionales. El clúster principal se desplegará en us-central1.

Para crear el clúster, sigue estos pasos:

Autopilot

En Cloud Shell, ejecuta los siguientes comandos:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

Cuando se te solicite, escribe yes.

Estándar

En Cloud Shell, ejecuta los siguientes comandos:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

Cuando se te solicite, escribe yes.

Los archivos de configuración de Terraform crean los siguientes recursos para desplegar tu infraestructura:

  • Crea un repositorio de Artifact Registry para almacenar las imágenes de Docker.
  • Crea la red VPC y la subred de la interfaz de red de la VM.
  • Crea dos clústeres de GKE.

Terraform crea un clúster privado en las dos regiones y habilita Copia de seguridad de GKE para la recuperación ante desastres.

Desplegar Kafka en un clúster

En esta sección, desplegarás Kafka en GKE mediante un gráfico de Helm. La operación crea los siguientes recursos:

Para usar el gráfico de Helm e implementar Kafka, sigue estos pasos:

  1. Configura el acceso a Docker.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Rellena Artifact Registry con las imágenes de Kafka y Zookeeper.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. Configura el acceso a la línea de comandos kubectl al clúster principal.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. Crea un espacio de nombres.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Instala Kafka con la versión 20.0.6 del gráfico de Helm.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    El resultado debería ser similar al siguiente:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Verifica que tus réplicas de Kafka estén en ejecución (puede tardar unos minutos).

    kubectl get all -n kafka
    

    El resultado debería ser similar al siguiente:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

Crear datos de prueba

En esta sección, probarás la aplicación de Kafka y generarás mensajes.

  1. Crea un pod de cliente consumidor para interactuar con la aplicación Kafka.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. Crea un tema llamado topic1 con tres particiones y un factor de replicación de tres.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Verifica que las particiones del tema se repliquen en los tres brokers.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debería ser similar al siguiente:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    En el ejemplo de salida, observe que topic1 tiene tres particiones, cada una con un líder y un conjunto de réplicas diferentes. Esto se debe a que Kafka usa particiones para distribuir los datos entre varios brokers, lo que permite una mayor escalabilidad y tolerancia a fallos. El factor de replicación de tres asegura que cada partición tenga tres réplicas, de modo que los datos sigan estando disponibles aunque fallen uno o dos brokers.

  4. Ejecuta el siguiente comando para generar números de mensajes en bloque en topic1.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. Ejecuta el siguiente comando para consumir topic1 de todas las particiones.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Escribe CTRL+C para detener el proceso del consumidor.

Comparar el rendimiento de Kafka

Para modelar con precisión un caso práctico, puedes simular la carga esperada en el clúster. Para probar el rendimiento, usarás las herramientas incluidas en el paquete de Kafka, es decir, las secuencias de comandos kafka-producer-perf-test.sh y kafka-consumer-perf-test.sh de la carpeta bin.

  1. Crea un tema para la comparativa.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Crea carga en el clúster de Kafka.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    El productor generará 10.000.000 de registros el topic-benchmark. La salida es similar a la siguiente:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    Una vez que se hayan enviado todos los registros, debería ver métricas adicionales en la salida, como las siguientes:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    Para salir del reloj, escribe CTRL + C.

  3. Sal del shell del Pod.

    exit
    

Gestionar licencias

Las actualizaciones de las versiones de Kafka y Kubernetes se publican de forma periódica. Sigue las prácticas recomendadas operativas para actualizar tu entorno de software con regularidad.

Planificar las actualizaciones binarias de Kafka

En esta sección, actualizarás la imagen de Kafka con Helm y verificarás que tus temas siguen disponibles.

Para actualizar la versión anterior de Kafka desde el gráfico de Helm que has usado en Desplegar Kafka en tu clúster, sigue estos pasos:

  1. Rellena Artifact Registry con la siguiente imagen:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. Sigue estos pasos para implementar un gráfico de Helm con las imágenes actualizadas de Kafka y Zookeeper. Para obtener instrucciones específicas de una versión, consulta las instrucciones de Kafka para actualizar versiones.

    1. Actualiza la versión de la dependencia Chart.yaml:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. Despliega el gráfico de Helm con las nuevas imágenes de Kafka y Zookeeper, tal como se muestra en el siguiente ejemplo:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Observa cómo se actualizan los pods de Kafka:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    Para salir del reloj, escribe CTRL + C.

  3. Conéctate al clúster de Kafka mediante un pod de cliente.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. Comprueba que puedes acceder a los mensajes de topic1.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    En la salida se deben mostrar los mensajes generados en el paso anterior. Escribe CTRL+C para salir del proceso.

  5. Sal del pod del cliente.

    exit
    

Prepararse para la recuperación tras fallos

Para asegurarte de que tus cargas de trabajo de producción sigan estando disponibles en caso de que se produzca un evento que interrumpa el servicio, debes preparar un plan de recuperación tras fallos. Para obtener más información sobre la planificación de la recuperación tras fallos, consulta la guía de planificación de la recuperación tras fallos.

Para crear copias de seguridad de tus cargas de trabajo en clústeres de GKE y restaurarlas, puedes usar Copia de seguridad de GKE.

.

Ejemplo de situación de copia de seguridad y restauración de Kafka

En esta sección, crearás una copia de seguridad de tu clúster desde gke-kafka-us-central1 y la restaurarás en gke-kafka-us-west1. Realizarás la operación de copia de seguridad y restauración en el ámbito de la aplicación, usando el ProtectedApplicationrecurso personalizado.

En el siguiente diagrama se muestran los componentes de la solución de recuperación ante desastres y cómo se relacionan entre sí.

En el diagrama se muestra un ejemplo de solución de copia de seguridad y recuperación para un clúster de Kafka de alta disponibilidad.
Imagen 3: ejemplo de solución de copia de seguridad y recuperación para un clúster de Kafka de alta disponibilidad.

Para preparar la copia de seguridad y la restauración de tu clúster de Kafka, sigue estos pasos:

  1. Configura las variables de entorno.

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. Verifica que el clúster esté en estado RUNNING.

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. Crea un plan de copia de seguridad.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. Crea una copia de seguridad manualmente. Aunque las copias de seguridad programadas se rigen por la programación cron del plan de copias de seguridad, en el siguiente ejemplo se muestra cómo puedes iniciar una operación de copia de seguridad única.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. Crea un plan de restauración.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. Restaurar manualmente a partir de una copia de seguridad.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. Observa cómo aparece la aplicación restaurada en el clúster de copia de seguridad. Puede que tarden unos minutos en ejecutarse y estar listos todos los pods.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    Escribe CTRL+C para salir del reloj cuando todos los pods estén en funcionamiento.

  8. Valida que un consumidor pueda obtener los temas anteriores.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    El resultado debería ser similar al siguiente:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    Escribe CTRL+C para salir del proceso.

  9. Salir del Pod.

    exit
    

Simular una interrupción del servicio de Kafka

En esta sección, simularás un fallo de nodo sustituyendo un nodo de Kubernetes que aloja el broker. Esta sección solo se aplica a la edición Standard. Autopilot gestiona los nodos por ti, por lo que no se puede simular un fallo de nodo.

  1. Crea un pod de cliente para conectarte a la aplicación Kafka.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. Crea el tema topic-failover-test y genera tráfico de prueba.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Determina qué broker es el líder del tema topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debería ser similar al siguiente:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    En el resultado anterior, Leader: 1 significa que el líder de topic-failover-test es el broker 1. Corresponde al pod kafka-1.

  4. Abre un nuevo terminal y conéctate al mismo clúster.

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. Busca en qué nodo se está ejecutando el pod kafka-1.

    kubectl get pod -n kafka kafka-1 -o wide
    

    El resultado debería ser similar al siguiente:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    En el resultado anterior, se muestra que el pod kafka-1 se está ejecutando en el nodo gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

  6. Drena el nodo para desalojar los pods.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    Sustituye NODE por el nodo en el que se ejecuta el pod kafka-1. En este ejemplo, el nodo es gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

    El resultado debería ser similar al siguiente:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Busca en qué nodo se está ejecutando el pod kafka-1.

    kubectl get pod -n kafka kafka-1 -o wide
    

    La salida debería ser similar a la siguiente:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    En el resultado anterior, se puede ver que la aplicación se está ejecutando en un nuevo nodo.

  8. En el terminal conectado al pod kafka-client, determina qué broker es el líder de topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    La salida debería ser similar a la siguiente:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    En el ejemplo de salida, el líder sigue siendo 1 . Sin embargo, ahora se ejecuta en un nuevo nodo.

Prueba de error de líder de Kafka

  1. En Cloud Shell, conéctate al cliente de Kafka y usa describe para ver el líder elegido de cada partición en topic1.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debería ser similar al siguiente:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. En Cloud Shell, que no está conectado al cliente de Kafka, elimina el broker líder kafka-0 para forzar una nueva elección de líder. Debes eliminar el índice que se asigna a uno de los líderes en el resultado anterior.

    kubectl delete pod -n kafka kafka-0 --force
    

    El resultado debería ser similar al siguiente:

    pod "kafka-0" force deleted
    
  3. En Cloud Shell conectado al cliente de Kafka, usa describe para ver el líder elegido.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    El resultado debería ser similar al siguiente:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    En el resultado, el nuevo líder de cada partición cambia si se ha asignado al líder que se ha interrumpido (kafka-0). Esto indica que el líder original se ha sustituido cuando se ha eliminado y se ha vuelto a crear el pod.