O Kafka é um sistema de mensagens de publicação/subscrição distribuído de código aberto para processar dados de streaming em tempo real, de alto volume e de alto débito. Pode usar o Kafka para criar pipelines de dados de streaming que movem dados de forma fiável em diferentes sistemas e aplicações para processamento e análise.
Este tutorial destina-se a administradores de plataformas, arquitetos da nuvem e profissionais de operações interessados na implementação de clusters Kafka altamente disponíveis no Google Kubernetes Engine (GKE).
Crie a infraestrutura do cluster
Nesta secção, vai executar um script do Terraform para criar dois clusters do GKE regionais.
O cluster principal vai ser implementado em us-central1
.
Para criar o cluster, siga estes passos:
Piloto automático
No Cloud Shell, execute os seguintes comandos:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Quando lhe for pedido, escreva yes
.
Standard
No Cloud Shell, execute os seguintes comandos:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Quando lhe for pedido, escreva yes
.
Os ficheiros de configuração do Terraform criam os seguintes recursos para implementar a sua infraestrutura:
- Crie um repositório do Artifact Registry para armazenar as imagens Docker.
- Crie a rede VPC e a sub-rede para a interface de rede da VM.
- Crie dois clusters do GKE.
O Terraform cria um cluster privado nas duas regiões e ativa a Cópia de segurança do GKE para recuperação de desastres.
Implemente o Kafka no seu cluster
Nesta secção, vai implementar o Kafka no GKE através de um gráfico Helm. A operação cria os seguintes recursos:
- Os StatefulSets do Kafka e do Zookeeper.
- Uma implementação do exportador do Kafka. O exportador recolhe métricas do Kafka para consumo do Prometheus.
- Um orçamento de interrupção de pods (PDB) que limita o número de pods offline durante uma interrupção voluntária.
Para usar o gráfico Helm para implementar o Kafka, siga estes passos:
Configure o acesso ao Docker.
gcloud auth configure-docker us-docker.pkg.dev
Preencha o Artifact Registry com as imagens do Kafka e do Zookeeper.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
Configure o acesso à linha de comandos
kubectl
ao cluster principal.gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}
Crie um espaço de nomes.
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Instale o Kafka através da versão 20.0.6 do gráfico Helm.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
O resultado é semelhante ao seguinte:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Verifique se as réplicas do Kafka estão em execução (esta ação pode demorar alguns minutos).
kubectl get all -n kafka
O resultado é semelhante ao seguinte:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Crie dados de teste
Nesta secção, vai testar a aplicação Kafka e gerar mensagens.
Crie um pod de cliente consumidor para interagir com a aplicação Kafka.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
Crie um tópico com o nome
topic1
com três partições e um fator de replicação de três.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Verifique se as partições de tópicos são replicadas nos três agentes.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado é semelhante ao seguinte:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
No exemplo de resultado, repare que
topic1
tem três partições, cada uma com um líder e um conjunto de réplicas diferentes. Isto deve-se ao facto de o Kafka usar a divisão em partições para distribuir os dados por vários agentes, o que permite uma maior escalabilidade e tolerância a falhas. O fator de replicação de três garante que cada partição tem três réplicas, para que os dados continuem disponíveis mesmo que um ou dois agentes falhem.Execute o seguinte comando para gerar números de mensagens em massa no formato
topic1
.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
Execute o seguinte comando para consumir
topic1
de todas as partições.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Escreva
CTRL+C
para parar o processo do consumidor.
Teste de referência do Kafka
Para modelar com precisão um exemplo de utilização, pode executar uma simulação da carga esperada no cluster. Para testar o desempenho, vai usar as ferramentas incluídas no pacote Kafka, nomeadamente os scripts kafka-producer-perf-test.sh
e kafka-consumer-perf-test.sh
na pasta bin
.
Crie um tópico para testes de referência.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Crie carga no cluster Kafka.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
O produtor vai gerar 10 000 000 de registos a
topic-benchmark
. O resultado é semelhante ao seguinte:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
Depois de todos os registos terem sido enviados, deve ver métricas adicionais no resultado, semelhantes às seguintes:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
Para sair do relógio, escreva
CTRL + C
.Saia da shell do Pod.
exit
Faça a gestão das atualizações
As atualizações de versões do Kafka e do Kubernetes são lançadas regularmente. Siga as práticas recomendadas operacionais para atualizar regularmente o seu ambiente de software.
Planeie atualizações binárias do Kafka
Nesta secção, vai atualizar a imagem do Kafka através do Helm e verificar se os seus tópicos ainda estão disponíveis.
Para atualizar a versão anterior do Kafka a partir do gráfico Helm que usou em Implemente o Kafka no seu cluster, siga estes passos:
Preencha o Artifact Registry com a seguinte imagem:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
Execute estes passos para implementar um gráfico Helm com as imagens atualizadas do Kafka e do Zookeeper. Para ver orientações específicas da versão, consulte as instruções do Kafka para atualizações de versões.
- Atualize a versão da dependência
Chart.yaml
:
../scripts/chart.sh kafka 20.1.0
Implemente o gráfico Helm com as novas imagens do Kafka e do Zookeeper, conforme mostrado no seguinte exemplo:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Veja a atualização dos pods do Kafka:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
Para sair do relógio, escreva
CTRL + C
.- Atualize a versão da dependência
Ligue-se ao cluster do Kafka através de um pod de cliente.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
Confirme que consegue aceder às mensagens de
topic1
.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado deve mostrar as mensagens geradas no passo anterior. Escreva
CTRL+C
para sair do processo.Saia do pod do cliente.
exit
Prepare-se para a recuperação de desastres
Para garantir que as suas cargas de trabalho de produção permanecem disponíveis em caso de um evento que interrompa o serviço, deve preparar um plano de recuperação de desastres (RD). Para saber mais sobre o planeamento de recuperação de desastres, consulte o Guia de planeamento de recuperação de desastres.
Para fazer uma cópia de segurança e restaurar as suas cargas de trabalho em clusters do GKE, pode usar a cópia de segurança do GKE.
Exemplo de um cenário de cópia de segurança e restauro do Kafka
Nesta secção, vai fazer uma cópia de segurança do cluster a partir de gke-kafka-us-central1
e restaurar a cópia de segurança em gke-kafka-us-west1
. Vai realizar a operação de cópia de segurança e restauro ao nível da aplicação, usando o recurso personalizado ProtectedApplication
.
O diagrama seguinte ilustra os componentes da solução de recuperação de desastres e a respetiva relação.
Para se preparar para fazer uma cópia de segurança e restaurar o cluster Kafka, siga estes passos:
Configure as variáveis de ambiente.
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
Verifique se o cluster está no estado
RUNNING
.gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
Crie um plano de cópia de segurança.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
Crie manualmente uma cópia de segurança. Embora as cópias de segurança agendadas sejam normalmente regidas pelo agendamento cron no plano de cópia de segurança, o exemplo seguinte mostra como pode iniciar uma operação de cópia de segurança única.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
Crie um plano de restauro.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
Restaure manualmente a partir de uma cópia de segurança.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
Veja a aplicação restaurada aparecer no cluster de cópia de segurança. Pode demorar alguns minutos até que todos os pods estejam em execução e prontos.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watch
Escreva
CTRL+C
para sair do relógio quando todos os pods estiverem em funcionamento.Valide se os tópicos anteriores podem ser obtidos por um consumidor.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
O resultado é semelhante ao seguinte:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
Escreva
CTRL+C
para sair do processo.Saia do Pod.
exit
Simule uma interrupção do serviço Kafka
Nesta secção, vai simular uma falha de nó substituindo um nó do Kubernetes que aloja o agente. Esta secção aplica-se apenas ao tipo Padrão. O Autopilot gere os seus nós, pelo que não é possível simular a falha de nós.
Crie um pod de cliente para estabelecer ligação à aplicação Kafka.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
Crie o tópico
topic-failover-test
e gere tráfego de teste.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Determine que agente é o líder do tópico
topic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado é semelhante ao seguinte:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Na saída acima,
Leader: 1
significa que o líder detopic-failover-test
é o agente 1. Isto corresponde ao Podkafka-1
.Abra um novo terminal e ligue-se ao mesmo cluster.
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
Descubra em que nó o pod
kafka-1
está a ser executado.kubectl get pod -n kafka kafka-1 -o wide
O resultado é semelhante ao seguinte:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
No resultado acima, vê que o pod
kafka-1
está a ser executado no nógke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.Esvazie o nó para despejar os pods.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
Substitua NODE pelo pod do nó em que o kafka-1 está a ser executado. Neste exemplo, o nó é
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.O resultado é semelhante ao seguinte:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
Descubra em que nó o pod
kafka-1
está a ser executado.kubectl get pod -n kafka kafka-1 -o wide
O resultado deve ser semelhante ao seguinte:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
A partir do resultado acima, vê que a aplicação está a ser executada num novo nó.
No terminal ligado ao
kafka-client
pod, determine que agente é o líder paratopic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado deve ser semelhante ao seguinte:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
No exemplo de saída, a variante líder continua a ser 1 . No entanto, agora está a ser executado num novo nó.
Teste de falha do líder do Kafka
No Cloud Shell, ligue-se ao cliente Kafka e use
describe
para ver o líder eleito para cada partição emtopic1
.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado é semelhante ao seguinte:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
No Cloud Shell não ligado ao cliente Kafka, elimine o agente principal
kafka-0
para forçar uma nova eleição de principal. Deve eliminar o índice que mapeia um dos líderes na saída anterior.kubectl delete pod -n kafka kafka-0 --force
O resultado é semelhante ao seguinte:
pod "kafka-0" force deleted
No Cloud Shell ligado ao cliente Kafka, use
describe
para ver o líder selecionado.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
O resultado é semelhante ao seguinte:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
Na saída, o novo líder de cada partição muda, se tiver sido atribuído ao líder que foi interrompido (
kafka-0
). Isto indica que o líder original foi substituído quando o pod foi eliminado e recriado.