En la guía, se muestra cómo usar el operador Strimzi para implementar clústeres de Apache Kafka.
Kafka es un sistema de mensajería distribuido de código abierto diseñado para controlar datos de transmisión de gran volumen, alta capacidad de procesamiento y transmisión en tiempo real. Te permite compilar canalizaciones de datos de transmisión para una transferencia de datos confiable en diferentes sistemas y aplicaciones a fin de admitir las tareas de análisis y procesamiento.
Los operadores son extensiones de software que usan recursos personalizados para administrar aplicaciones y sus componentes. Para obtener más información sobre la motivación para usar operadores, consulta Patrón de operadores en la documentación de Kubernetes de código abierto. El operador de Strimzi ofrece flexibilidad en las opciones de implementación y te permite usar taints y tolerancias de Kubernetes para ejecutar Kafka en nodos dedicados.
Esta guía está dirigida a administradores de plataformas, arquitectos de nube y profesionales de operaciones interesados en implementar clústeres de Kafka en GKE.
Esta solución es un buen punto de partida si deseas aprender a implementar clústeres de Kafka con un operador de terceros para automatizar la administración y reducir los errores. Si prefieres un control operativo más detallado, consulta Implementa un clúster de Kafka con alta disponibilidad en GKE.
Prepare el entorno
En este instructivo, usarás Cloud Shell para administrar recursos alojados en Google Cloud. Cloud Shell tiene preinstalado el software que necesitas para este instructivo, incluido kubectl
, la CLI de gcloud, Helm y Terraform.
Para configurar tu entorno con Cloud Shell, sigue estos pasos:
Para iniciar una sesión de Cloud Shell desde la Google Cloud consola, haz clic en
Activar Cloud Shell en la Google Cloud consola. Esto inicia una sesión en el panel inferior de la consola de Google Cloud .
Establece las variables de entorno:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
Reemplaza
PROJECT_ID
: tu Google Cloud por tuID del proyecto.Clona el repositorio de GitHub:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Cambia al directorio de trabajo:
cd kubernetes-engine-samples/streaming/
Crea la infraestructura del clúster
En esta sección, debes ejecutar una secuencia de comandos de Terraform para crear un clúster de GKE privado y con alta disponibilidad. Los siguientes pasos permiten el acceso público al plano de control. Para restringir el acceso, crea un clúster privado.
Puedes instalar el operador mediante un clúster de Standard o Autopilot.
Estándar
En el siguiente diagrama, se muestra un clúster de GKE estándar regional privado implementado en tres zonas diferentes:
Para implementar esta infraestructura, ejecuta los siguientes comandos desde Cloud Shell:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Cuando se te solicite, escribe yes
. Es posible que este comando tarde varios minutos en completarse y que el clúster muestre un estado de preparación.
Terraform crea los siguientes recursos:
- Una red de VPC y una subred privada para los nodos de Kubernetes.
- Un router para acceder a Internet a través de NAT.
- Un clúster de GKE privado en la región
us-central1
. - 2 grupos de nodos con ajuste de escala automático habilitado (de 1 a 2 nodos por zona y 1 nodo por zona como mínimo)
- Un
ServiceAccount
con permisos de registro y supervisión. - Copia de seguridad para GKE para la recuperación ante desastres.
- Google Cloud Managed Service para Prometheus para la supervisión de clústeres.
El resultado es similar a este:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
Autopilot
En el siguiente diagrama, se muestra un clúster de GKE de Autopilot regional privado:
Para implementar la infraestructura, ejecuta los siguientes comandos desde Cloud Shell:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Cuando se te solicite, escribe yes
. Es posible que este comando tarde varios minutos en completarse y que el clúster muestre un estado de preparación.
Terraform crea los siguientes recursos:
- La red de VPC y la subred privada para los nodos de Kubernetes.
- Un router para acceder a Internet a través de NAT.
- Un clúster de GKE privado en la región
us-central1
. - Un
ServiceAccount
con permisos de registro y supervisión - Google Cloud Managed Service para Prometheus para la supervisión de clústeres.
El resultado es similar a este:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"
Conéctate al clúster
Configura kubectl
para comunicarse con el clúster:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
Implementa el operador de Strimzi en tu clúster
En esta sección, implementarás el operador de Strimzi mediante un gráfico de Helm. También hay varias formas más de implementar Strimzi.
Agrega el repositorio de gráfico de Helmzi:
helm repo add strimzi https://strimzi.io/charts/
Agrega un espacio de nombres para el Operador Strimzi y el clúster de Kafka:
kubectl create ns kafka
Implementa el operador del clúster de Strimzi con Helm:
helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
Para implementar el operador de clústeres de Strimzi y los clústeres de Kafka en diferentes espacios de nombres, agrega el parámetro
--set watchNamespaces="{kafka-namespace,kafka-namespace-2,...}"
al comando de helm.Verifica que el operador de clústeres de Strimzi se haya implementado de forma correcta mediante Helm:
helm ls -n kafka
El resultado es similar a este:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION strimzi-operator kafka 1 2023-06-27 11:22:15.850545 +0200 CEST deployed strimzi-kafka-operator-0.35.0 0.35.0
Implementa Kafka
Después de implementar el operador en el clúster, estás listo para implementar una instancia de clúster de Kafka.
En esta sección, implementarás Kafka en una configuración básica y, luego, probarás varias situaciones de configuración avanzada para abordar los requisitos de disponibilidad, seguridad y observabilidad.
Configuración básica
La configuración básica de la instancia de Kafka incluye los siguientes componentes:
- Tres réplicas de agentes de Kafka, con un mínimo de dos réplicas disponibles para la coherencia del clúster.
- Tres réplicas de los nodos de ZooKeeper, que forman un clúster.
- Dos objetos de escucha de Kafka: uno sin autenticación y otro con autenticación de TLS con un certificado generado por Strimzi.
- MaxHeapSize y MinHeapSize de Java configurados en 4 GB para Kafka y 2 GB para ZooKeeper.
- Asignación de recursos de CPU de 1 solicitud de CPU y límites de 2 CPU para Kafka y ZooKeeper, junto con 5 GB de solicitudes de memoria y límites para Kafka (4 GB para el servicio principal y 0.5 GB para el exportador de métricas) y 2.5 GB para ZooKeeper (2 GB para el servicio principal y 0.5 GB para el exportador de métricas).
- Operador de entidades con las solicitudes y límites siguientes:
tlsSidecar
: 100 m/500 m de CPU y 128 Mi memoria.topicOperator
: 100 m/500 m de CPU y 512 Mi memoria.userOperator
: 500 m de CPU y 2 Gi de memoria.
- 100 GB de almacenamiento asignado a cada Pod mediante la
storageClass
premium-rwo
. - Tolerancias, nodeAffinities y podAntiAffinities configuradas para cada carga de trabajo, lo que garantiza una distribución adecuada entre nodos, con sus respectivos grupos de nodos y zonas diferentes.
- Comunicación dentro del clúster protegida por certificados autofirmados: autoridades certificadoras (CA) independientes para el clúster y los clientes (mTLS). También puedes configurar el uso de una autoridad certificadora diferente.
Esta configuración representa la configuración mínima necesaria para crear un clúster de Kafka listo para la producción. En las siguientes secciones, se muestran configuraciones personalizadas para abordar aspectos como la seguridad del clúster, las listas de control de acceso (LCA), la administración de temas, la administración de certificados y mucho más.
Crea un clúster básico de Kafka
Crea un clúster de Kafka nuevo mediante la configuración básica:
kubectl apply -n kafka -f kafka-strimzi/manifests/01-basic-cluster/my-cluster.yaml
Con este comando, se crea un recurso personalizado de Kafka del operador Strimzi que incluye solicitudes y límites de CPU y memoria, solicitudes de almacenamiento en bloque y una combinación de taints y afinidades para distribuir los Pods aprovisionados en los nodos de Kubernetes.
Espera unos minutos mientras Kubernetes inicia las cargas de trabajo requeridas:
kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n kafka
Verifica que se hayan creado las cargas de trabajo de Kafka:
kubectl get pod,service,deploy,pdb -l=strimzi.io/cluster=my-cluster -n kafka
El resultado es similar a este:
NAME READY STATUS RESTARTS AGE pod/my-cluster-entity-operator-848698874f-j5m7f 3/3 Running 0 44m pod/my-cluster-kafka-0 1/1 Running 0 5m pod/my-cluster-kafka-1 1/1 Running 0 5m pod/my-cluster-kafka-2 1/1 Running 0 5m pod/my-cluster-zookeeper-0 1/1 Running 0 6m pod/my-cluster-zookeeper-1 1/1 Running 0 6m pod/my-cluster-zookeeper-2 1/1 Running 0 6m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/my-cluster-kafka-bootstrap ClusterIP 10.52.8.80 <none> 9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-kafka-brokers ClusterIP None <none> 9090/TCP,9091/TCP,9092/TCP,9093/TCP 5m service/my-cluster-zookeeper-client ClusterIP 10.52.11.144 <none> 2181/TCP 6m service/my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 6m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/my-cluster-entity-operator 1/1 1 1 44m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster-kafka 2 N/A 1 5m poddisruptionbudget.policy/my-cluster-zookeeper 2 N/A 1 6m
El operador crea los siguientes recursos:
- Dos
StrimziPodSets
para Kafka y ZooKeeper. - Tres Pods para las réplicas de agente de Kafka.
- Tres Pods para las réplicas de ZooKeeper.
- Dos
PodDisruptionBudgets
, lo que garantiza una disponibilidad mínima de dos réplicas para la coherencia del clúster. - Un Service llamado
my-cluster-kafka-bootstrap
, que funciona como el servidor de arranque para los clientes de Kafka que se conectan desde el clúster de Kubernetes. Todos los objetos de escucha internos de Kafka están disponibles en este servicio. - Un Service sin interfaz gráfica llamado
my-cluster-kafka-brokers
que habilita la resolución de DNS de las direcciones IP del Pod del agente de Kafka directamente. Este Service se usa para la comunicación entre los agentes. - Un Service llamado
my-cluster-zookeeper-client
que permite a los agentes de Kafka conectarse a los nodos de ZooKeeper como clientes. - Un Service sin interfaz gráfica llamado
my-cluster-zookeeper-nodes
que habilita la resolución de DNS de las direcciones IP del Pod de ZooKeeper directamente. Este servicio se usa para conectarse entre las réplicas de ZooKeeper. - Una implementación llamada
my-cluster-entity-operator
que contiene el operador de temas y de usuarios y facilita la administración de los recursos personalizadosKafkaTopics
yKafkaUsers
.
También puedes configurar dos NetworkPolicies
para facilitar la conectividad con los objetos de escucha de Kafka desde cualquier Pod y espacio de nombres. Estas políticas también restringirían las conexiones a ZooKeeper a los agentes y habilitarían la comunicación entre los Pods del clúster y los puertos internos del Service, que son exclusivos de la comunicación del clúster.
Autenticación y administración de usuarios
En esta sección, se muestra cómo habilitar la autenticación y la autorización para proteger los objetos de escucha de Kafka y compartir credenciales con los clientes.
Strimzi proporciona un método nativo de Kubernetes para la administración de usuarios mediante un User Operator
independiente y su recurso personalizado de Kubernetes correspondiente, KafkaUser
, que define la configuración del usuario. La configuración de usuarios incluye la configuración de la autenticación y la autorización, y aprovisiona el usuario correspondiente en Kafka.
Strimzi puede crear objetos de escucha y usuarios de Kafka que admitan varios mecanismos de autenticación, como la autenticación basada en nombre de usuario y contraseña (SCRAM-SHA-512) o TLS. También puedes usar la autenticación OAuth 2.0, que a menudo se considera un mejor enfoque en comparación con el uso de contraseñas o certificados para la autenticación debido a la seguridad y la administración de credenciales externas.
Implementa un clúster de Kafka
En esta sección, se muestra cómo implementar un operador de Strimzi que demuestra las capacidades de administración de usuarios, lo que incluye lo siguiente:
- Un clúster de Kafka con autenticación basada en contraseña (SCRAM-SHA-512) habilitada en uno de los objetos de escucha.
- Un
KafkaTopic
con 3 réplicas. - Un
KafkaUser
con una LCA que especifica que el usuario tiene permisos de lectura y escritura en el tema.
Configura tu clúster de Kafka para usar un objeto de escucha con autenticación SCRAM-SHA-512 basada en contraseña en el puerto 9094 y autorización simple:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-cluster.yaml
Crea un
Topic
,User
y un Pod cliente para ejecutar comandos en el clúster de Kafka:kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/topic.yaml kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-user.yaml
El
my-user
Secret
con las credenciales de usuario se activa en el Pod del cliente como un volumen.Estas credenciales confirman que el usuario tiene permisos para publicar mensajes en el tema mediante el objeto de escucha con la autenticación basada en contraseñas (SCRAM-SHA-512) habilitada.
Crea un Pod de cliente:
kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/kafkacat.yaml
Espera unos minutos a que el Pod cliente se convierta en
Ready
y, luego, conéctate a él:kubectl wait --for=condition=Ready pod --all -n kafka --timeout=600s kubectl exec -it kafkacat -n kafka -- /bin/sh
Genera un mensaje nuevo con credenciales
my-user
y trata de consumirlo:echo "Message from my-user" |kcat \ -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -P kcat -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=SCRAM-SHA-512 \ -X sasl.username=my-user \ -X sasl.password=$(cat /my-user/password) \ -t my-topic -C
El resultado es similar a este:
Message from my-user % Reached end of topic my-topic [0] at offset 0 % Reached end of topic my-topic [2] at offset 1 % Reached end of topic my-topic [1] at offset 0
Escribe
CTRL+C
para detener el proceso del consumidor.Sal de la shell del Pod
exit
Copias de seguridad y recuperación ante desastres
Aunque el operador de Strimzi no ofrece una función de copia de seguridad integrada, puedes implementar estrategias de copia de seguridad eficientes si sigues ciertos patrones.
Puedes usar la Copia de seguridad para GKE para hacer una copia de seguridad:
- Manifiestos de recursos de Kubernetes.
- Recursos personalizados de la API de Strimzi y sus definiciones extraídas del servidor de APIs de Kubernetes del clúster que se somete a una copia de seguridad.
- Volúmenes que corresponden a los recursos PersistentVolumeClaim que se encuentran en los manifiestos.
A fin de obtener más información sobre cómo crear una copia de seguridad y restablecer los clústeres de Kafka con la copia de seguridad para GKE, consulta Prepárate para la recuperación ante desastres.
También puedes realizar una copia de seguridad de un clúster de Kafka que se implementó mediante el operador Strimzi. Debes crear una copia de seguridad:
- La configuración de Kafka, que incluye todos los recursos personalizados de la API de Strimzi, como
KafkaTopics
yKafkaUsers
. - Los datos, que se almacenan en los PersistentVolumes de los agentes de Kafka.
El almacenamiento de manifiestos de recursos de Kubernetes, incluidas las configuraciones de Strimzi, en los repositorios de Git, puede eliminar la necesidad de realizar una copia de seguridad por separado para la configuración de Kafka, ya que los recursos se pueden volver a aplicar a un clúster de Kubernetes nuevo cuando sea necesario.
A fin de proteger la recuperación de datos de Kafka en situaciones en las que se pierde una instancia del servidor de Kafka o un clúster de Kubernetes en el que se implementa Kafka, te recomendamos que configures la clase de almacenamiento de Kubernetes que se usa para aprovisionar volúmenes para los agentes de Kafka con la opción reclaimPolicy
configurada como Retain
. También te recomendamos que tomes instantáneas de los volúmenes del agente de Kafka.
En el siguiente manifiesto, se describe una StorageClass que usa la opción Retain
de reclaimPolicy
:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
En el siguiente ejemplo, se muestra la StorageClass que se agregó al spec
de un recurso personalizado del clúster de Kafka:
# ...
spec:
kafka:
# ...
storage:
type: persistent-claim
size: 100Gi
class: premium-rwo-retain
Con esta configuración, los PersistentVolumes aprovisionados mediante la clase de almacenamiento no se borran, incluso cuando se borra la PersistentVolumeClaim correspondiente.
Para recuperar la instancia de Kafka en un clúster de Kubernetes nuevo con la configuración existente y los datos de la instancia del agente, sigue estos pasos:
- Aplica los recursos personalizados de Strimzi Kafka existentes (
Kakfa
,KafkaTopic
,KafkaUser
, etc.) a un clúster de Kubernetes nuevo - Actualiza las PersistentVolumeClaims con el nombre de las instancias nuevas del agente de Kafka a los PersistentVolumes anteriores mediante la propiedad
spec.volumeName
en PersistentVolumeClaim.