Implemente um cluster do Kafka altamente disponível no GKE

O Kafka é um sistema de mensagens de publicação/subscrição distribuído de código aberto para processar dados de streaming em tempo real, de alto volume e de alto débito. Pode usar o Kafka para criar pipelines de dados de streaming que movem dados de forma fiável em diferentes sistemas e aplicações para processamento e análise.

Este tutorial destina-se a administradores de plataformas, arquitetos da nuvem e profissionais de operações interessados na implementação de clusters Kafka altamente disponíveis no Google Kubernetes Engine (GKE).

Crie a infraestrutura do cluster

Nesta secção, vai executar um script do Terraform para criar dois clusters do GKE regionais. O cluster principal vai ser implementado em us-central1.

Para criar o cluster, siga estes passos:

Piloto automático

No Cloud Shell, execute os seguintes comandos:

terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID

Quando lhe for pedido, escreva yes.

Standard

No Cloud Shell, execute os seguintes comandos:

terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID

Quando lhe for pedido, escreva yes.

Os ficheiros de configuração do Terraform criam os seguintes recursos para implementar a sua infraestrutura:

  • Crie um repositório do Artifact Registry para armazenar as imagens Docker.
  • Crie a rede VPC e a sub-rede para a interface de rede da VM.
  • Crie dois clusters do GKE.

O Terraform cria um cluster privado nas duas regiões e ativa a Cópia de segurança do GKE para recuperação de desastres.

Implemente o Kafka no seu cluster

Nesta secção, vai implementar o Kafka no GKE através de um gráfico Helm. A operação cria os seguintes recursos:

Para usar o gráfico Helm para implementar o Kafka, siga estes passos:

  1. Configure o acesso ao Docker.

    gcloud auth configure-docker us-docker.pkg.dev
    
  2. Preencha o Artifact Registry com as imagens do Kafka e do Zookeeper.

    ./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0
    ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52
    ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41
    ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
    
  3. Configure o acesso à linha de comandos kubectl ao cluster principal.

    gcloud container clusters get-credentials gke-kafka-us-central1 \
        --location=${REGION} \
        --project=${PROJECT_ID}
    
  4. Crie um espaço de nomes.

    export NAMESPACE=kafka
    kubectl create namespace $NAMESPACE
    
  5. Instale o Kafka através da versão 20.0.6 do gráfico Helm.

    cd helm
    ../scripts/chart.sh kafka 20.0.6 && \
    rm -rf Chart.lock charts && \
    helm dependency update && \
    helm -n kafka upgrade --install kafka . \
    --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
    
    

    O resultado é semelhante ao seguinte:

    NAME: kafka
    LAST DEPLOYED: Thu Feb 16 03:29:39 2023
    NAMESPACE: kafka
    STATUS: deployed
    REVISION: 1
    TEST SUITE: None
    
  6. Verifique se as réplicas do Kafka estão em execução (esta ação pode demorar alguns minutos).

    kubectl get all -n kafka
    

    O resultado é semelhante ao seguinte:

    ---
    NAME                    READY   STATUS    RESTARTS        AGE
    pod/kafka-0             1/1     Running   2 (3m51s ago)   4m28s
    pod/kafka-1             1/1     Running   3 (3m41s ago)   4m28s
    pod/kafka-2             1/1     Running   2 (3m57s ago)   4m28s
    pod/kafka-zookeeper-0   1/1     Running   0               4m28s
    pod/kafka-zookeeper-1   1/1     Running   0               4m28s
    pod/kafka-zookeeper-2   1/1     Running   0               4m28s
    
    NAME                                   TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)                      AGE
    service/kafka                          ClusterIP   192.168.112.124   <none>        9092/TCP                     4m29s
    service/kafka-app                      ClusterIP   192.168.75.57     <none>        9092/TCP                     35m
    service/kafka-app-headless             ClusterIP   None              <none>        9092/TCP,9093/TCP            35m
    service/kafka-app-zookeeper            ClusterIP   192.168.117.102   <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-app-zookeeper-headless   ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   35m
    service/kafka-headless                 ClusterIP   None              <none>        9092/TCP,9093/TCP            4m29s
    service/kafka-zookeeper                ClusterIP   192.168.89.249    <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    service/kafka-zookeeper-headless       ClusterIP   None              <none>        2181/TCP,2888/TCP,3888/TCP   4m29s
    
    NAME                               READY   AGE
    statefulset.apps/kafka             3/3     4m29s
    statefulset.apps/kafka-zookeeper   3/3     4m29s
    

Crie dados de teste

Nesta secção, vai testar a aplicação Kafka e gerar mensagens.

  1. Crie um pod de cliente consumidor para interagir com a aplicação Kafka.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
    
  2. Crie um tópico com o nome topic1 com três partições e um fator de replicação de três.

    kafka-topics.sh \
        --create \
        --topic topic1 \
        --partitions 3  \
        --replication-factor 3 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Verifique se as partições de tópicos são replicadas nos três agentes.

    kafka-topics.sh \
        --describe \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado é semelhante ao seguinte:

    Topic: topic1     TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
           Topic: topic1    Partition: 0    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
           Topic: topic1    Partition: 1    Leader: 1       Replicas: 1,2,0 Isr: 1,2,0
           Topic: topic1    Partition: 2    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
    

    No exemplo de resultado, repare que topic1 tem três partições, cada uma com um líder e um conjunto de réplicas diferentes. Isto deve-se ao facto de o Kafka usar a divisão em partições para distribuir os dados por vários agentes, o que permite uma maior escalabilidade e tolerância a falhas. O fator de replicação de três garante que cada partição tem três réplicas, para que os dados continuem disponíveis mesmo que um ou dois agentes falhem.

  4. Execute o seguinte comando para gerar números de mensagens em massa no formato topic1.

    ALLOW_PLAINTEXT_LISTENER=yes
    for x in $(seq 0 200); do
      echo "$x: Message number $x"
    done | kafka-console-producer.sh \
        --topic topic1 \
        --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \
        --property parse.key=true \
        --property key.separator=":"
    
  5. Execute o seguinte comando para consumir topic1 de todas as partições.

    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    Escreva CTRL+C para parar o processo do consumidor.

Teste de referência do Kafka

Para modelar com precisão um exemplo de utilização, pode executar uma simulação da carga esperada no cluster. Para testar o desempenho, vai usar as ferramentas incluídas no pacote Kafka, nomeadamente os scripts kafka-producer-perf-test.sh e kafka-consumer-perf-test.sh na pasta bin.

  1. Crie um tópico para testes de referência.

    kafka-topics.sh \
      --create \
      --topic topic-benchmark \
      --partitions 3  \
      --replication-factor 3 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  2. Crie carga no cluster Kafka.

    KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \
        --topic topic-benchmark \
        --num-records 10000000 \
        --throughput -1 \
        --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \
              batch.size=16384 \
              acks=all \
              linger.ms=500 \
              compression.type=none \
        --record-size 100 \
        --print-metrics
    

    O produtor vai gerar 10 000 000 de registos a topic-benchmark. O resultado é semelhante ao seguinte:

    623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency.
    1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency.
    1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency.
    2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
    

    Depois de todos os registos terem sido enviados, deve ver métricas adicionais no resultado, semelhantes às seguintes:

    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark}     : 173316.233
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark}    : 10000000.000
    

    Para sair do relógio, escreva CTRL + C.

  3. Saia da shell do Pod.

    exit
    

Faça a gestão das atualizações

As atualizações de versões do Kafka e do Kubernetes são lançadas regularmente. Siga as práticas recomendadas operacionais para atualizar regularmente o seu ambiente de software.

Planeie atualizações binárias do Kafka

Nesta secção, vai atualizar a imagem do Kafka através do Helm e verificar se os seus tópicos ainda estão disponíveis.

Para atualizar a versão anterior do Kafka a partir do gráfico Helm que usou em Implemente o Kafka no seu cluster, siga estes passos:

  1. Preencha o Artifact Registry com a seguinte imagem:

    ../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2
    ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61
    ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49
    ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
    
  2. Execute estes passos para implementar um gráfico Helm com as imagens atualizadas do Kafka e do Zookeeper. Para ver orientações específicas da versão, consulte as instruções do Kafka para atualizações de versões.

    1. Atualize a versão da dependência Chart.yaml:
    ../scripts/chart.sh kafka 20.1.0
    
    
    1. Implemente o gráfico Helm com as novas imagens do Kafka e do Zookeeper, conforme mostrado no seguinte exemplo:

      rm -rf Chart.lock charts && \
      helm dependency update && \
      helm -n kafka upgrade --install kafka ./ \
            --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
      

    Veja a atualização dos pods do Kafka:

    kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
    

    Para sair do relógio, escreva CTRL + C.

  3. Ligue-se ao cluster do Kafka através de um pod de cliente.

    kubectl run kafka-client -n kafka --rm -ti \
      --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
    
  4. Confirme que consegue aceder às mensagens de topic1.

    kafka-console-consumer.sh \
      --topic topic1 \
      --from-beginning \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado deve mostrar as mensagens geradas no passo anterior. Escreva CTRL+C para sair do processo.

  5. Saia do pod do cliente.

    exit
    

Prepare-se para a recuperação de desastres

Para garantir que as suas cargas de trabalho de produção permanecem disponíveis em caso de um evento que interrompa o serviço, deve preparar um plano de recuperação de desastres (RD). Para saber mais sobre o planeamento de recuperação de desastres, consulte o Guia de planeamento de recuperação de desastres.

Para fazer uma cópia de segurança e restaurar as suas cargas de trabalho em clusters do GKE, pode usar a cópia de segurança do GKE.

Exemplo de um cenário de cópia de segurança e restauro do Kafka

Nesta secção, vai fazer uma cópia de segurança do cluster a partir de gke-kafka-us-central1 e restaurar a cópia de segurança em gke-kafka-us-west1. Vai realizar a operação de cópia de segurança e restauro ao nível da aplicação, usando o recurso personalizado ProtectedApplication.

O diagrama seguinte ilustra os componentes da solução de recuperação de desastres e a respetiva relação.

O diagrama mostra um exemplo de solução de cópia de segurança e recuperação para um cluster Kafka de alta disponibilidade.
Figura 3: exemplo de solução de cópia de segurança e recuperação para um cluster Kafka de elevada disponibilidade.

Para se preparar para fazer uma cópia de segurança e restaurar o cluster Kafka, siga estes passos:

  1. Configure as variáveis de ambiente.

    export BACKUP_PLAN_NAME=kafka-protected-app
    export BACKUP_NAME=protected-app-backup-1
    export RESTORE_PLAN_NAME=kafka-protected-app
    export RESTORE_NAME=protected-app-restore-1
    export REGION=us-central1
    export DR_REGION=us-west1
    export CLUSTER_NAME=gke-kafka-$REGION
    export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
    
  2. Verifique se o cluster está no estado RUNNING.

    gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
    
  3. Crie um plano de cópia de segurança.

    gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --include-secrets \
        --include-volume-data \
        --cron-schedule="0 3 * * *" \
        --backup-retain-days=7 \
        --backup-delete-lock-days=0
    
  4. Crie manualmente uma cópia de segurança. Embora as cópias de segurança agendadas sejam normalmente regidas pelo agendamento cron no plano de cópia de segurança, o exemplo seguinte mostra como pode iniciar uma operação de cópia de segurança única.

    gcloud beta container backup-restore backups create $BACKUP_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=$BACKUP_PLAN_NAME \
        --wait-for-completion
    
  5. Crie um plano de restauro.

    gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \
        --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \
        --cluster-resource-conflict-policy=use-existing-version \
        --namespaced-resource-restore-mode=delete-and-restore \
        --volume-data-restore-policy=restore-volume-data-from-backup \
        --selected-applications=kafka/kafka,kafka/zookeeper \
        --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
    
  6. Restaure manualmente a partir de uma cópia de segurança.

    gcloud beta container backup-restore restores create $RESTORE_NAME \
        --project=$PROJECT_ID \
        --location=$DR_REGION \
        --restore-plan=$RESTORE_PLAN_NAME \
        --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
    
  7. Veja a aplicação restaurada aparecer no cluster de cópia de segurança. Pode demorar alguns minutos até que todos os pods estejam em execução e prontos.

    gcloud container clusters get-credentials gke-kafka-us-west1 \
        --location us-west1
    kubectl get pod -n kafka --watch
    

    Escreva CTRL+C para sair do relógio quando todos os pods estiverem em funcionamento.

  8. Valide se os tópicos anteriores podem ser obtidos por um consumidor.

    kubectl run kafka-client -n kafka --rm -ti \
        --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
    kafka-console-consumer.sh \
        --bootstrap-server kafka.kafka.svc.cluster.local:9092 \
        --topic topic1 \
        --property print.key=true \
        --property key.separator=" : " \
        --from-beginning;
    

    O resultado é semelhante ao seguinte:

    192 :  Message number 192
    193 :  Message number 193
    197 :  Message number 197
    200 :  Message number 200
    Processed a total of 201 messages
    

    Escreva CTRL+C para sair do processo.

  9. Saia do Pod.

    exit
    

Simule uma interrupção do serviço Kafka

Nesta secção, vai simular uma falha de nó substituindo um nó do Kubernetes que aloja o agente. Esta secção aplica-se apenas ao tipo Padrão. O Autopilot gere os seus nós, pelo que não é possível simular a falha de nós.

  1. Crie um pod de cliente para estabelecer ligação à aplicação Kafka.

    kubectl run kafka-client -n kafka --restart='Never' -it \
    --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
    
  2. Crie o tópico topic-failover-test e gere tráfego de teste.

    kafka-topics.sh \
      --create \
      --topic topic-failover-test \
      --partitions 1  \
      --replication-factor 3  \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    
  3. Determine que agente é o líder do tópico topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado é semelhante ao seguinte:

    Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2
    

    Na saída acima, Leader: 1 significa que o líder de topic-failover-test é o agente 1. Isto corresponde ao Pod kafka-1.

  4. Abra um novo terminal e ligue-se ao mesmo cluster.

    gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
    
  5. Descubra em que nó o pod kafka-1 está a ser executado.

    kubectl get pod -n kafka kafka-1 -o wide
    

    O resultado é semelhante ao seguinte:

    NAME      READY   STATUS    RESTARTS      AGE   IP              NODE                                               NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   1 (35m ago)   36m   192.168.132.4   gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72   <none>           <none>
    

    No resultado acima, vê que o pod kafka-1 está a ser executado no nó gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

  6. Esvazie o nó para despejar os pods.

    kubectl drain NODE \
      --delete-emptydir-data \
      --force \
      --ignore-daemonsets
    

    Substitua NODE pelo pod do nó em que o kafka-1 está a ser executado. Neste exemplo, o nó é gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72.

    O resultado é semelhante ao seguinte:

    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned
    Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578
    evicting pod kafka/kafka-1
    evicting pod kube-system/kube-dns-7d48cb57b-j4d8f
    evicting pod kube-system/calico-typha-56995c8d85-5clph
    pod/calico-typha-56995c8d85-5clph evicted
    pod/kafka-1 evicted
    pod/kube-dns-7d48cb57b-j4d8f evicted
    node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
    
  7. Descubra em que nó o pod kafka-1 está a ser executado.

    kubectl get pod -n kafka kafka-1 -o wide
    

    O resultado deve ser semelhante ao seguinte:

    NAME      READY   STATUS    RESTARTS   AGE     IP              NODE                                              NOMINATED NODE   READINESS GATES
    kafka-1   2/2     Running   0          2m49s   192.168.128.8   gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7   <none>           <none>
    

    A partir do resultado acima, vê que a aplicação está a ser executada num novo nó.

  8. No terminal ligado ao kafka-client pod, determine que agente é o líder para topic-failover-test.

    kafka-topics.sh --describe \
      --topic topic-failover-test \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado deve ser semelhante ao seguinte:

    Topic: topic-failover-test     TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic-failover-test     Partition: 0    Leader: 1       Replicas: 1,0,2 Isr: 0,2,1
    

    No exemplo de saída, a variante líder continua a ser 1 . No entanto, agora está a ser executado num novo nó.

Teste de falha do líder do Kafka

  1. No Cloud Shell, ligue-se ao cliente Kafka e use describe para ver o líder eleito para cada partição em topic1.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado é semelhante ao seguinte:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: topic1   Partition: 1    Leader: 0       Replicas: 2,1,0 Isr: 0,2,1
        Topic: topic1   Partition: 2    Leader: 0       Replicas: 1,0,2 Isr: 0,2,1
    
  2. No Cloud Shell não ligado ao cliente Kafka, elimine o agente principal kafka-0 para forçar uma nova eleição de principal. Deve eliminar o índice que mapeia um dos líderes na saída anterior.

    kubectl delete pod -n kafka kafka-0 --force
    

    O resultado é semelhante ao seguinte:

    pod "kafka-0" force deleted
    
  3. No Cloud Shell ligado ao cliente Kafka, use describe para ver o líder selecionado.

    kafka-topics.sh --describe \
      --topic topic1 \
      --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
    

    O resultado é semelhante ao seguinte:

    Topic: topic1   TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3       ReplicationFactor: 3    Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824
        Topic: topic1   Partition: 0    Leader: 2       Replicas: 0,1,2 Isr: 2,0,1
        Topic: topic1   Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
        Topic: topic1   Partition: 2    Leader: 2       Replicas: 1,2,0 Isr: 2,0,1
    

    Na saída, o novo líder de cada partição muda, se tiver sido atribuído ao líder que foi interrompido (kafka-0). Isto indica que o líder original foi substituído quando o pod foi eliminado e recriado.