Kafka es un sistema de mensajería de publicación y suscripción distribuido y de código abierto para gestionar datos de streaming en tiempo real, de alto volumen y de alto rendimiento. Puedes usar Kafka para crear flujos de procesamiento de datos que transfieran datos de forma fiable entre diferentes sistemas y aplicaciones para procesarlos y analizarlos.
Este tutorial está dirigido a administradores de plataformas, arquitectos de la nube y profesionales de operaciones que quieran desplegar clústeres de Kafka de alta disponibilidad en Google Kubernetes Engine (GKE).
Crear la infraestructura del clúster
En esta sección, ejecutarás una secuencia de comandos de Terraform para crear dos clústeres de GKE regionales.
El clúster principal se desplegará en us-central1
.
Para crear el clúster, sigue estos pasos:
Autopilot
En Cloud Shell, ejecuta los siguientes comandos:
terraform -chdir=terraform/gke-autopilot init
terraform -chdir=terraform/gke-autopilot apply -var project_id=$PROJECT_ID
Cuando se te solicite, escribe yes
.
Estándar
En Cloud Shell, ejecuta los siguientes comandos:
terraform -chdir=terraform/gke-standard init
terraform -chdir=terraform/gke-standard apply -var project_id=$PROJECT_ID
Cuando se te solicite, escribe yes
.
Los archivos de configuración de Terraform crean los siguientes recursos para desplegar tu infraestructura:
- Crea un repositorio de Artifact Registry para almacenar las imágenes de Docker.
- Crea la red VPC y la subred de la interfaz de red de la VM.
- Crea dos clústeres de GKE.
Terraform crea un clúster privado en las dos regiones y habilita Copia de seguridad de GKE para la recuperación ante desastres.
Desplegar Kafka en un clúster
En esta sección, desplegarás Kafka en GKE mediante un gráfico de Helm. La operación crea los siguientes recursos:
- Los StatefulSets de Kafka y Zookeeper.
- Un despliegue de exportador de Kafka. El exportador recoge métricas de Kafka para que las use Prometheus.
- Un presupuesto de interrupciones de pods (PDB) que limita el número de pods sin conexión durante una interrupción voluntaria.
Para usar el gráfico de Helm e implementar Kafka, sigue estos pasos:
Configura el acceso a Docker.
gcloud auth configure-docker us-docker.pkg.dev
Rellena Artifact Registry con las imágenes de Kafka y Zookeeper.
./scripts/gcr.sh bitnami/kafka 3.3.2-debian-11-r0 ./scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r52 ./scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r41 ./scripts/gcr.sh bitnami/zookeeper 3.8.0-debian-11-r74
Configura el acceso a la línea de comandos
kubectl
al clúster principal.gcloud container clusters get-credentials gke-kafka-us-central1 \ --location=${REGION} \ --project=${PROJECT_ID}
Crea un espacio de nombres.
export NAMESPACE=kafka kubectl create namespace $NAMESPACE
Instala Kafka con la versión 20.0.6 del gráfico de Helm.
cd helm ../scripts/chart.sh kafka 20.0.6 && \ rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka . \ --set global.imageRegistry="us-docker.pkg.dev/$PROJECT_ID/main"
El resultado debería ser similar al siguiente:
NAME: kafka LAST DEPLOYED: Thu Feb 16 03:29:39 2023 NAMESPACE: kafka STATUS: deployed REVISION: 1 TEST SUITE: None
Verifica que tus réplicas de Kafka estén en ejecución (puede tardar unos minutos).
kubectl get all -n kafka
El resultado debería ser similar al siguiente:
--- NAME READY STATUS RESTARTS AGE pod/kafka-0 1/1 Running 2 (3m51s ago) 4m28s pod/kafka-1 1/1 Running 3 (3m41s ago) 4m28s pod/kafka-2 1/1 Running 2 (3m57s ago) 4m28s pod/kafka-zookeeper-0 1/1 Running 0 4m28s pod/kafka-zookeeper-1 1/1 Running 0 4m28s pod/kafka-zookeeper-2 1/1 Running 0 4m28s NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/kafka ClusterIP 192.168.112.124 <none> 9092/TCP 4m29s service/kafka-app ClusterIP 192.168.75.57 <none> 9092/TCP 35m service/kafka-app-headless ClusterIP None <none> 9092/TCP,9093/TCP 35m service/kafka-app-zookeeper ClusterIP 192.168.117.102 <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-app-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 35m service/kafka-headless ClusterIP None <none> 9092/TCP,9093/TCP 4m29s service/kafka-zookeeper ClusterIP 192.168.89.249 <none> 2181/TCP,2888/TCP,3888/TCP 4m29s service/kafka-zookeeper-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 4m29s NAME READY AGE statefulset.apps/kafka 3/3 4m29s statefulset.apps/kafka-zookeeper 3/3 4m29s
Crear datos de prueba
En esta sección, probarás la aplicación de Kafka y generarás mensajes.
Crea un pod de cliente consumidor para interactuar con la aplicación Kafka.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.3.2-debian-11-r0 -- bash
Crea un tema llamado
topic1
con tres particiones y un factor de replicación de tres.kafka-topics.sh \ --create \ --topic topic1 \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Verifica que las particiones del tema se repliquen en los tres brokers.
kafka-topics.sh \ --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
El resultado debería ser similar al siguiente:
Topic: topic1 TopicId: 1ntc4WiFS4-AUNlpr9hCmg PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic1 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
En el ejemplo de salida, observe que
topic1
tiene tres particiones, cada una con un líder y un conjunto de réplicas diferentes. Esto se debe a que Kafka usa particiones para distribuir los datos entre varios brokers, lo que permite una mayor escalabilidad y tolerancia a fallos. El factor de replicación de tres asegura que cada partición tenga tres réplicas, de modo que los datos sigan estando disponibles aunque fallen uno o dos brokers.Ejecuta el siguiente comando para generar números de mensajes en bloque en
topic1
.ALLOW_PLAINTEXT_LISTENER=yes for x in $(seq 0 200); do echo "$x: Message number $x" done | kafka-console-producer.sh \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092 \ --property parse.key=true \ --property key.separator=":"
Ejecuta el siguiente comando para consumir
topic1
de todas las particiones.kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
Escribe
CTRL+C
para detener el proceso del consumidor.
Comparar el rendimiento de Kafka
Para modelar con precisión un caso práctico, puedes simular la carga esperada en el clúster. Para probar el rendimiento, usarás las herramientas incluidas en el paquete de Kafka, es decir, las secuencias de comandos kafka-producer-perf-test.sh
y kafka-consumer-perf-test.sh
de la carpeta bin
.
Crea un tema para la comparativa.
kafka-topics.sh \ --create \ --topic topic-benchmark \ --partitions 3 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Crea carga en el clúster de Kafka.
KAFKA_HEAP_OPTS="-Xms4g -Xmx4g" kafka-producer-perf-test.sh \ --topic topic-benchmark \ --num-records 10000000 \ --throughput -1 \ --producer-props bootstrap.servers=kafka.kafka.svc.cluster.local:9092 \ batch.size=16384 \ acks=all \ linger.ms=500 \ compression.type=none \ --record-size 100 \ --print-metrics
El productor generará 10.000.000 de registros el
topic-benchmark
. La salida es similar a la siguiente:623821 records sent, 124316.7 records/sec (11.86 MB/sec), 1232.7 ms avg latency, 1787.0 ms max latency. 1235948 records sent, 247140.2 records/sec (23.57 MB/sec), 1253.0 ms avg latency, 1587.0 ms max latency. 1838898 records sent, 367779.6 records/sec (35.07 MB/sec), 793.6 ms avg latency, 1185.0 ms max latency. 2319456 records sent, 463242.7 records/sec (44.18 MB/sec), 54.0 ms avg latency, 321.0 ms max latency.
Una vez que se hayan enviado todos los registros, debería ver métricas adicionales en la salida, como las siguientes:
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=topic-benchmark} : 173316.233 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=topic-benchmark} : 10000000.000
Para salir del reloj, escribe
CTRL + C
.Sal del shell del Pod.
exit
Gestionar licencias
Las actualizaciones de las versiones de Kafka y Kubernetes se publican de forma periódica. Sigue las prácticas recomendadas operativas para actualizar tu entorno de software con regularidad.
Planificar las actualizaciones binarias de Kafka
En esta sección, actualizarás la imagen de Kafka con Helm y verificarás que tus temas siguen disponibles.
Para actualizar la versión anterior de Kafka desde el gráfico de Helm que has usado en Desplegar Kafka en tu clúster, sigue estos pasos:
Rellena Artifact Registry con la siguiente imagen:
../scripts/gcr.sh bitnami/kafka 3.4.0-debian-11-r2 ../scripts/gcr.sh bitnami/kafka-exporter 1.6.0-debian-11-r61 ../scripts/gcr.sh bitnami/jmx-exporter 0.17.2-debian-11-r49 ../scripts/gcr.sh bitnami/zookeeper 3.8.1-debian-11-r0
Sigue estos pasos para implementar un gráfico de Helm con las imágenes actualizadas de Kafka y Zookeeper. Para obtener instrucciones específicas de una versión, consulta las instrucciones de Kafka para actualizar versiones.
- Actualiza la versión de la dependencia
Chart.yaml
:
../scripts/chart.sh kafka 20.1.0
Despliega el gráfico de Helm con las nuevas imágenes de Kafka y Zookeeper, tal como se muestra en el siguiente ejemplo:
rm -rf Chart.lock charts && \ helm dependency update && \ helm -n kafka upgrade --install kafka ./ \ --set global.imageRegistry="$REGION-docker.pkg.dev/$PROJECT_ID/main"
Observa cómo se actualizan los pods de Kafka:
kubectl get pod -l app.kubernetes.io/component=kafka -n kafka --watch
Para salir del reloj, escribe
CTRL + C
.- Actualiza la versión de la dependencia
Conéctate al clúster de Kafka mediante un pod de cliente.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0-debian-11-r2 -- bash
Comprueba que puedes acceder a los mensajes de
topic1
.kafka-console-consumer.sh \ --topic topic1 \ --from-beginning \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
En la salida se deben mostrar los mensajes generados en el paso anterior. Escribe
CTRL+C
para salir del proceso.Sal del pod del cliente.
exit
Prepararse para la recuperación tras fallos
Para asegurarte de que tus cargas de trabajo de producción sigan estando disponibles en caso de que se produzca un evento que interrumpa el servicio, debes preparar un plan de recuperación tras fallos. Para obtener más información sobre la planificación de la recuperación tras fallos, consulta la guía de planificación de la recuperación tras fallos.
Para crear copias de seguridad de tus cargas de trabajo en clústeres de GKE y restaurarlas, puedes usar Copia de seguridad de GKE.
.Ejemplo de situación de copia de seguridad y restauración de Kafka
En esta sección, crearás una copia de seguridad de tu clúster desde gke-kafka-us-central1
y la restaurarás en gke-kafka-us-west1
. Realizarás la operación de copia de seguridad y restauración en el ámbito de la aplicación, usando el ProtectedApplication
recurso personalizado.
En el siguiente diagrama se muestran los componentes de la solución de recuperación ante desastres y cómo se relacionan entre sí.
Para preparar la copia de seguridad y la restauración de tu clúster de Kafka, sigue estos pasos:
Configura las variables de entorno.
export BACKUP_PLAN_NAME=kafka-protected-app export BACKUP_NAME=protected-app-backup-1 export RESTORE_PLAN_NAME=kafka-protected-app export RESTORE_NAME=protected-app-restore-1 export REGION=us-central1 export DR_REGION=us-west1 export CLUSTER_NAME=gke-kafka-$REGION export DR_CLUSTER_NAME=gke-kafka-$DR_REGION
Verifica que el clúster esté en estado
RUNNING
.gcloud container clusters describe $CLUSTER_NAME --location us-central1 --format='value(status)'
Crea un plan de copia de seguridad.
gcloud beta container backup-restore backup-plans create $BACKUP_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --cluster=projects/$PROJECT_ID/locations/$REGION/clusters/$CLUSTER_NAME \ --selected-applications=kafka/kafka,kafka/zookeeper \ --include-secrets \ --include-volume-data \ --cron-schedule="0 3 * * *" \ --backup-retain-days=7 \ --backup-delete-lock-days=0
Crea una copia de seguridad manualmente. Aunque las copias de seguridad programadas se rigen por la programación cron del plan de copias de seguridad, en el siguiente ejemplo se muestra cómo puedes iniciar una operación de copia de seguridad única.
gcloud beta container backup-restore backups create $BACKUP_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=$BACKUP_PLAN_NAME \ --wait-for-completion
Crea un plan de restauración.
gcloud beta container backup-restore restore-plans create $RESTORE_PLAN_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --backup-plan=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME \ --cluster=projects/$PROJECT_ID/locations/$DR_REGION/clusters/$DR_CLUSTER_NAME \ --cluster-resource-conflict-policy=use-existing-version \ --namespaced-resource-restore-mode=delete-and-restore \ --volume-data-restore-policy=restore-volume-data-from-backup \ --selected-applications=kafka/kafka,kafka/zookeeper \ --cluster-resource-scope-selected-group-kinds="storage.k8s.io/StorageClass"
Restaurar manualmente a partir de una copia de seguridad.
gcloud beta container backup-restore restores create $RESTORE_NAME \ --project=$PROJECT_ID \ --location=$DR_REGION \ --restore-plan=$RESTORE_PLAN_NAME \ --backup=projects/$PROJECT_ID/locations/$DR_REGION/backupPlans/$BACKUP_PLAN_NAME/backups/$BACKUP_NAME
Observa cómo aparece la aplicación restaurada en el clúster de copia de seguridad. Puede que tarden unos minutos en ejecutarse y estar listos todos los pods.
gcloud container clusters get-credentials gke-kafka-us-west1 \ --location us-west1 kubectl get pod -n kafka --watch
Escribe
CTRL+C
para salir del reloj cuando todos los pods estén en funcionamiento.Valida que un consumidor pueda obtener los temas anteriores.
kubectl run kafka-client -n kafka --rm -ti \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
kafka-console-consumer.sh \ --bootstrap-server kafka.kafka.svc.cluster.local:9092 \ --topic topic1 \ --property print.key=true \ --property key.separator=" : " \ --from-beginning;
El resultado debería ser similar al siguiente:
192 : Message number 192 193 : Message number 193 197 : Message number 197 200 : Message number 200 Processed a total of 201 messages
Escribe
CTRL+C
para salir del proceso.Salir del Pod.
exit
Simular una interrupción del servicio de Kafka
En esta sección, simularás un fallo de nodo sustituyendo un nodo de Kubernetes que aloja el broker. Esta sección solo se aplica a la edición Standard. Autopilot gestiona los nodos por ti, por lo que no se puede simular un fallo de nodo.
Crea un pod de cliente para conectarte a la aplicación Kafka.
kubectl run kafka-client -n kafka --restart='Never' -it \ --image us-docker.pkg.dev/$PROJECT_ID/main/bitnami/kafka:3.4.0 -- bash
Crea el tema
topic-failover-test
y genera tráfico de prueba.kafka-topics.sh \ --create \ --topic topic-failover-test \ --partitions 1 \ --replication-factor 3 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
Determina qué broker es el líder del tema
topic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
El resultado debería ser similar al siguiente:
Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
En el resultado anterior,
Leader: 1
significa que el líder detopic-failover-test
es el broker 1. Corresponde al podkafka-1
.Abre un nuevo terminal y conéctate al mismo clúster.
gcloud container clusters get-credentials gke-kafka-us-west1 --location us-west1 --project PROJECT_ID
Busca en qué nodo se está ejecutando el pod
kafka-1
.kubectl get pod -n kafka kafka-1 -o wide
El resultado debería ser similar al siguiente:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 1 (35m ago) 36m 192.168.132.4 gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 <none> <none>
En el resultado anterior, se muestra que el pod
kafka-1
se está ejecutando en el nodogke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.Drena el nodo para desalojar los pods.
kubectl drain NODE \ --delete-emptydir-data \ --force \ --ignore-daemonsets
Sustituye NODE por el nodo en el que se ejecuta el pod kafka-1. En este ejemplo, el nodo es
gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72
.El resultado debería ser similar al siguiente:
node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 cordoned Warning: ignoring DaemonSet-managed Pods: gmp-system/collector-gjzsd, kube-system/calico-node-t28bj, kube-system/fluentbit-gke-lxpft, kube-system/gke-metadata-server-kxw78, kube-system/ip-masq-agent-kv2sq, kube-system/netd-h446k, kube-system/pdcsi-node-ql578 evicting pod kafka/kafka-1 evicting pod kube-system/kube-dns-7d48cb57b-j4d8f evicting pod kube-system/calico-typha-56995c8d85-5clph pod/calico-typha-56995c8d85-5clph evicted pod/kafka-1 evicted pod/kube-dns-7d48cb57b-j4d8f evicted node/gke-gke-kafka-us-west1-pool-system-a0d0d395-nx72 drained
Busca en qué nodo se está ejecutando el pod
kafka-1
.kubectl get pod -n kafka kafka-1 -o wide
La salida debería ser similar a la siguiente:
NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES kafka-1 2/2 Running 0 2m49s 192.168.128.8 gke-gke-kafka-us-west1-pool-kafka-700d8e8d-05f7 <none> <none>
En el resultado anterior, se puede ver que la aplicación se está ejecutando en un nuevo nodo.
En el terminal conectado al pod
kafka-client
, determina qué broker es el líder detopic-failover-test
.kafka-topics.sh --describe \ --topic topic-failover-test \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
La salida debería ser similar a la siguiente:
Topic: topic-failover-test TopicId: bemKyqmERAuKZC5ymFwsWg PartitionCount: 1 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic-failover-test Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 0,2,1
En el ejemplo de salida, el líder sigue siendo 1 . Sin embargo, ahora se ejecuta en un nuevo nodo.
Prueba de error de líder de Kafka
En Cloud Shell, conéctate al cliente de Kafka y usa
describe
para ver el líder elegido de cada partición entopic1
.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
El resultado debería ser similar al siguiente:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic1 Partition: 1 Leader: 0 Replicas: 2,1,0 Isr: 0,2,1 Topic: topic1 Partition: 2 Leader: 0 Replicas: 1,0,2 Isr: 0,2,1
En Cloud Shell, que no está conectado al cliente de Kafka, elimina el broker líder
kafka-0
para forzar una nueva elección de líder. Debes eliminar el índice que se asigna a uno de los líderes en el resultado anterior.kubectl delete pod -n kafka kafka-0 --force
El resultado debería ser similar al siguiente:
pod "kafka-0" force deleted
En Cloud Shell conectado al cliente de Kafka, usa
describe
para ver el líder elegido.kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server kafka-headless.kafka.svc.cluster.local:9092
El resultado debería ser similar al siguiente:
Topic: topic1 TopicId: B3Jr_5t2SPq7F1jVHu4r0g PartitionCount: 3 ReplicationFactor: 3 Configs: flush.ms=1000,segment.bytes=1073741824,flush.messages=10000,max.message.bytes=1000012,retention.bytes=1073741824 Topic: topic1 Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2,0,1 Topic: topic1 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 Topic: topic1 Partition: 2 Leader: 2 Replicas: 1,2,0 Isr: 2,0,1
En el resultado, el nuevo líder de cada partición cambia si se ha asignado al líder que se ha interrumpido (
kafka-0
). Esto indica que el líder original se ha sustituido cuando se ha eliminado y se ha vuelto a crear el pod.