O Kafka é um sistema de mensagens de publicação/assinatura de código aberto e distribuído para lidar com dados de streaming de alto volume, alta capacidade e streaming em tempo real. É possível usar o Kafka para criar pipelines de dados de streaming que movam dados de maneira confiável em diferentes sistemas e aplicativos para processamento e análise.
Este tutorial é destinado a administradores de plataformas, arquitetos de nuvem e profissionais de operações interessados em implantar clusters Kafka altamente disponíveis no Google Kubernetes Engine (GKE).
Criar a infraestrutura do cluster
Nesta seção, você vai executar um script do Terraform para criar dois
clusters regionais do GKE.
O cluster primário será implantado em us-central1
.
Para criar o cluster, siga estas etapas:
Piloto automático
No Cloud Shell, execute os seguintes comandos:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Quando solicitado, digite yes
.
Padrão
No Cloud Shell, execute os seguintes comandos:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Quando solicitado, digite yes
.
Os arquivos de configuração do Terraform criam os seguintes recursos para implantar a infraestrutura:
- Crie um repositório do Artifact Registry para armazenar as imagens do Docker.
- Crie a rede VPC e a sub-rede da interface de rede da VM.
- Crie dois clusters do GKE.
O Terraform cria um cluster particular nas duas regiões e ativa o Backup para GKE para a recuperação de desastres.
Implantar Kafka no cluster
Nesta seção, você implantará o Kafka no GKE usando um gráfico do Helm. A operação cria os seguintes recursos:
- Os StatefulSets Kafka e Zookeeper.
- Uma implantação de exportador Kafka. O exportador coleta métricas do Kafka para o consumo do Prometheus.
- Um orçamento de interrupção de pods (PDB) que limita o número de pods off-line durante uma interrupção voluntária.
Para usar o gráfico do Helm para implantar o Kafka, siga estas etapas:
Configurar o acesso ao Docker
gcloud auth configure-docker us-docker.pkg.dev
Preencha o Artifact Registry com as imagens do Kafka e do Zookeeper.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
Configure o acesso da linha de comando
kubectl
ao cluster principal.gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}
Crie um namespace.
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Instale o Kafka usando a versão 20.0.6 do gráfico do Helm.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
O resultado será assim:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Verifique se as réplicas do Kafka estão em execução. Isso pode levar alguns minutos.
kubectl get all -n kafka
O resultado será assim:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Crie dados de teste
Nesta seção, você vai testar o aplicativo Kafka e gerar mensagens.
Criar um pod cliente do cliente para interagir com o aplicativo Kafka.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
Crie um tópico chamado
topic1
com três partições e um fator de replicação de três.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Verifique se as partições de tópico estão replicadas nos três agentes.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado será assim:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
No exemplo de saída, observe que
topic1
tem três partições, cada uma com um líder e um conjunto de réplicas diferentes. Isso ocorre porque o Kafka usa o particionamento para distribuir os dados entre vários agentes, permitindo maior escalonabilidade e tolerância a falhas. O fator de replicação de três garante que cada partição tenha três réplicas, para que os dados ainda estejam disponíveis, mesmo que um ou dois agentes falhem.Execute o comando a seguir para gerar números de mensagem em massa em
topic1
.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
Execute o comando a seguir para consumir
topic1
de todas as partições.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Digite
CTRL+C
para interromper o processo do consumidor.
Comparativo de mercado do Kafka
Para modelar com precisão um caso de uso, execute uma simulação da carga esperada no cluster. Para testar a performance, use as ferramentas
incluídas no pacote Kafka, ou seja, os scripts kafka-producer-perf-test.sh
e
kafka-consumer-perf-test.sh
na pasta bin
.
Crie um tópico para comparação.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Criar carga no cluster do Kafka.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
O produtor vai gerar 10.000.000 registros em
topic-benchmark
. A resposta será parecida com esta:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
Depois que todos os registros forem enviados, serão exibidas outras métricas na saída, como a seguir:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
Para sair da exibição, digite
CTRL + C
.Saia do shell do pod.
exit
Gerenciar upgrades
As atualizações de versão do Kafka e do Kubernetes são lançadas regularmente. Siga as práticas recomendadas operacionais para atualizar o ambiente de software regularmente.
Planejar upgrades binários do Kafka
Nesta seção, você atualizará a imagem do Kafka usando o Helm e verificará se os temas ainda estão disponíveis.
Para fazer upgrade da versão anterior do Kafka do gráfico Helm que você usou em Implantar o Kafka no seu cluster, siga estas etapas:
Preencha o Artifact Registry com a seguinte imagem:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
Siga estas etapas para implantar um gráfico do Helm com as imagens atualizadas do Kafka e do Zookeeper. Para orientações específicas da versão, consulte as instruções do Kafka para upgrades de versão.
- Atualize a versão da dependência
Chart.yaml
:
../scripts/chart.sh kafka 20.1.0
Implante o gráfico do Helm com as novas imagens do Kafka e do Zookeeper, conforme mostrado no seguinte exemplo:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Acompanhe o upgrade dos pods do Kafka:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
Para sair da exibição, digite
CTRL + C
.- Atualize a versão da dependência
Conectar-se ao cluster do Kafka usando um pod cliente.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
Verifique se você pode acessar mensagens de
topic1
.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
A saída vai mostrar as mensagens geradas na etapa anterior. Digite
CTRL+C
para sair do processo.Saia do pod cliente.
exit
Prepare-se para a recuperação de desastres
Para garantir que suas cargas de trabalho de produção permaneçam disponíveis no caso de um evento de interrupção de serviço, prepare um plano de recuperação de desastres (DR). Para saber mais sobre o planejamento de DR, consulte o Guia de planejamento de recuperação de desastres.
Para fazer backup e restaurar suas cargas de trabalho em clusters do GKE, use o Backup para o GKE.
Exemplo de cenário de backup e restauração do Kafka
Nesta seção, você fará um backup do cluster em gke-kafka-us-central1
e o restaurará em gke-kafka-us-west1
. Você realizará a operação de backup
e restauração no escopo do aplicativo usando o
recurso personalizado ProtectedApplication
.
O diagrama a seguir ilustra os componentes da solução de recuperação de desastres e como eles se relacionam.
Para se preparar para o backup e a restauração do cluster do Kafka, siga estas etapas:
Configurar as variáveis de ambiente
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
Verifique se o cluster está em um estado
RUNNING
.gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
Crie um plano de backup.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
Crie um backup manualmente. Embora os backups programados geralmente sejam regidos pelo cron-schedule no plano de backup, o exemplo a seguir mostra como iniciar uma operação de backup única.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
Crie um plano de restauração.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
Restaure manualmente de um backup.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
Veja o aplicativo restaurado aparecer no cluster de backup. Pode levar alguns minutos para que todos os pods estejam em execução e prontos.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watch
Digite
CTRL+C
para sair da exibição quando todos os pods estiverem em execução.Valide se os temas anteriores podem ser buscados por um consumidor.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
O resultado será assim:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
Digite
CTRL+C
para sair do processo.Saia do pod.
exit
Simular uma interrupção do serviço Kafka
Nesta seção, você simulará uma falha de nó substituindo um nó do Kubernetes que hospeda o agente. Esta seção se aplica somente ao padrão. O Autopilot gerencia os nós para você, então a falha não pode ser simulada.
Criar um pod de cliente para se conectar ao aplicativo Kafka.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
Crie o tópico
topic-failover-test
e gere tráfego de teste.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Determine qual agente é o líder do tópico
topic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado será assim:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Na saída acima,
Leader: 1
significa que o líder detopic-failover-test
é agente 1. Corresponde ao podkafka-1
.Abra um novo terminal e conecte-se ao mesmo cluster.
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
Descubra em qual nó o pod
kafka-1
está sendo executado.kubectl get pod -n kafka kafka-1 -o wide
O resultado será assim:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
Na saída acima, você vê que o pod
kafka-1
está em execução no nógke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.Drene o nó para remover os pods.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
Substitua NODE pelo pod do nó kafka-1 em execução. Neste exemplo, o nó é
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.O resultado será assim:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
Descubra em qual nó o pod
kafka-1
está sendo executado.kubectl get pod -n kafka kafka-1 -o wide
A resposta será parecida com esta:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
Na saída acima, você vê que o aplicativo está em execução em um novo nó.
No terminal conectado ao pod
kafka-client
, determine qual agente é líder paratopic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
A resposta será parecida com esta:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
No exemplo de saída, o líder ainda é 1. No entanto, ele está em execução em um novo nó.
Teste de falha do líder do Kafka
No Cloud Shell, conecte-se ao cliente Kafka e use
describe
para ver o líder escolhido para cada partição emtopic1
.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado será assim:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
No Cloud Shell, não conectado ao cliente Kafka, exclua o agente líder
kafka-0
para forçar uma nova eleição de líder. Exclua o índice que mapeia para um dos líderes na saída anterior.kubectl delete pod -n kafka kafka-0 --force
O resultado será assim:
pod "kafka-0" force deleted
No Cloud Shell conectado ao cliente Kafka e use
describe
para ver o líder escolhido.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado será assim:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
Na saída, o novo líder para cada partição será alterado, se ele tiver sido atribuído ao líder que foi interrompido (
kafka-0
). Isso indica que o líder original foi substituído quando o pod foi excluído e recriado.