O guia mostra como usar o operador Confluent for Kubernetes (CFK) para implementar clusters do Apache Kafka no Google Kubernetes Engine (GKE).
O Kafka é um sistema de mensagens de publicação/subscrição distribuído e de código aberto para processar dados de streaming em tempo real, de alto volume e de alto débito. Pode usar o Kafka para criar pipelines de dados de streaming que movem dados de forma fiável em diferentes sistemas e aplicações para processamento e análise.
Este guia destina-se a administradores de plataformas, arquitetos da nuvem e profissionais de operações interessados na implementação de clusters Kafka no GKE.
Também pode usar o operador CFK para implementar outros componentes da Confluent Platform, como o centro de controlo da Confluent baseado na Web, o registo de esquemas ou o KsqlDB. No entanto, este guia foca-se apenas nas implementações do Kafka.
Prepare o ambiente
Neste tutorial, vai usar o
Cloud Shell
para gerir recursos alojados
no Google Cloud. O Cloud Shell está pré-instalado com o software
necessário para este tutorial, incluindo
kubectl
,
a
CLI gcloud,
Helm> e
Terraform.
Para configurar o seu ambiente com o Cloud Shell, siga estes passos:
Inicie uma sessão do Cloud Shell a partir da Google Cloud consola, clicando em
Ativar Cloud Shell na Google Cloud consola. Esta ação inicia uma sessão no painel inferior da Google Cloud consola.
Defina variáveis de ambiente:
export PROJECT_ID=PROJECT_ID export KUBERNETES_CLUSTER_PREFIX=kafka export REGION=us-central1
Substitua
PROJECT_ID
: your Google Cloud pelo seu ID do projeto.Clone o repositório do GitHub:
git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
Mude para o diretório de trabalho:
cd kubernetes-engine-samples/streaming
Crie a infraestrutura do cluster
Nesta secção, executa um script do Terraform para criar um cluster do GKE privado, de alta disponibilidade e regional. Os passos seguintes permitem o acesso público ao plano de controlo. Para restringir o acesso, crie um cluster privado.
Pode instalar o operador através de um cluster padrão ou do Autopilot.
Standard
O diagrama seguinte mostra um cluster GKE padrão regional privado implementado em três zonas diferentes:
Para implementar esta infraestrutura, execute os seguintes comandos a partir do Cloud Shell:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Quando lhe for pedido, escreva yes
. Este comando pode demorar vários minutos
a ser concluído e o cluster a apresentar o estado pronto.
O Terraform cria os seguintes recursos:
- Uma rede VPC e uma sub-rede privada para os nós do Kubernetes.
- Um router para aceder à Internet através de NAT.
- Um cluster do GKE privado na região
us-central1
. - 2 conjuntos de nós com o dimensionamento automático ativado (1 a 2 nós por zona, mínimo de 1 nó por zona)
- Um
ServiceAccount
com autorizações de registo e monitorização. - Cópia de segurança do GKE para recuperação de desastres.
- Google Cloud Managed Service for Prometheus para monitorização de clusters.
O resultado é semelhante ao seguinte:
...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Piloto automático
O diagrama seguinte mostra um cluster do GKE Autopilot regional privado:
Para implementar a infraestrutura, execute os seguintes comandos a partir do Cloud Shell:
export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
-var region=${REGION} \
-var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}
Quando lhe for pedido, escreva yes
. Este comando pode demorar vários minutos
a ser concluído e o cluster a apresentar o estado pronto.
O Terraform cria os seguintes recursos:
- Rede VPC e sub-rede privada para os nós do Kubernetes.
- Um router para aceder à Internet através de NAT.
- Um cluster do GKE privado na região
us-central1
. - Um
ServiceAccount
com autorizações de registo e monitorização - Google Cloud Managed Service for Prometheus para monitorização de clusters.
O resultado é semelhante ao seguinte:
...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.
Outputs:
kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"
Estabeleça ligação ao cluster
Configure o kubectl
para comunicar com o cluster:
gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}
Implemente o operador CFK no seu cluster
Nesta secção, implementa o operador Confluent for Kubernetes (CFK) através de um gráfico Helm e, em seguida, implementa um cluster Kafka.
Adicione o repositório do gráfico Helm da Confluent:
helm repo add confluentinc https://packages.confluent.io/helm
Adicione um espaço de nomes para o operador CFK e o cluster Kafka:
kubectl create ns kafka
Implemente o operador do cluster do CFK através do Helm:
helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
Para permitir que o CFK faça a gestão de recursos em todos os espaços de nomes, adicione o parâmetro
--set-namespaced=false
ao comando Helm.Verifique se o operador Confluent foi implementado com êxito usando o Helm:
helm ls -n kafka
O resultado é semelhante ao seguinte:
NAME NAMESPACE REVISION UPDATED STATUS CHART APP VERSION confluent-operator kafka 1 2023-07-07 10:57:45.409158 +0200 CEST deployed confluent-for-kubernetes-0.771.13 2.6.0
Implemente o Kafka
Nesta secção, implementa o Kafka numa configuração básica e, em seguida, experimenta vários cenários de configuração avançada para resolver os requisitos de disponibilidade, segurança e observabilidade.
Configuração básica
A configuração básica da instância do Kafka inclui os seguintes componentes:
- Três réplicas de agentes Kafka, com um mínimo de duas réplicas disponíveis necessárias para a consistência do cluster.
- Três réplicas de nós do ZooKeeper, que formam um cluster.
- Dois ouvintes do Kafka: um sem autenticação e outro que usa a autenticação TLS com um certificado gerado pelo CFK.
- MaxHeapSize e MinHeapSize do Java definidos como 4 GB para o Kafka.
- A atribuição de recursos da CPU de 1 pedido de CPU e 2 limites de CPU, e 5 GB de pedidos e limites de memória para o Kafka (4 GB para o serviço principal e 0,5 GB para o exportador de métricas) e 3 GB para o Zookeeper (2 GB para o serviço principal e 0,5 GB para o exportador de métricas).
- 100 GB de armazenamento atribuído a cada Pod através da
premium-rwo
storageClass, 100 para dados do Kafka e 90/10 para dados/registo do Zookeeper. - Tolerâncias, nodeAffinities e podAntiAffinities configurados para cada carga de trabalho, garantindo a distribuição adequada pelos nós, usando os respetivos node pools e zonas diferentes.
- Comunicação no cluster protegida por certificados autoassinados através de uma autoridade de certificação que fornece.
Esta configuração representa a configuração mínima necessária para criar um cluster Kafka pronto para produção. As secções seguintes demonstram configurações personalizadas para abordar aspetos como a segurança do cluster, as listas de controlo de acesso (ACLs), a gestão de tópicos, a gestão de certificados e muito mais.
Crie um cluster Kafka básico
Gere um par de CA:
openssl genrsa -out ca-key.pem 2048 openssl req -new -key ca-key.pem -x509 \ -days 1000 \ -out ca.pem \ -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
O Confluent for Kubernetes fornece certificados gerados automaticamente para os componentes da Confluent Platform para utilização na encriptação de rede TLS. Tem de gerar e fornecer uma autoridade de certificação (AC).
Crie um segredo do Kubernetes para a autoridade de certificação:
kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
O nome do segredo é predefinido
Crie um novo cluster do Kafka com a configuração básica:
kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
Este comando cria um recurso personalizado do Kafka e um recurso personalizado do Zookeeper do operador CFK que inclui pedidos e limites de CPU e memória, pedidos de armazenamento de blocos e contaminações e afinidades para distribuir os pods aprovisionados pelos nós do Kubernetes.
Aguarde alguns minutos enquanto o Kubernetes inicia as cargas de trabalho necessárias:
kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
Verifique se as cargas de trabalho do Kafka foram criadas:
kubectl get pod,svc,statefulset,deploy,pdb -n kafka
O resultado é semelhante ao seguinte:
NAME READY STATUS RESTARTS AGE pod/confluent-operator-864c74d4b4-fvpxs 1/1 Running 0 49m pod/my-cluster-0 1/1 Running 0 17m pod/my-cluster-1 1/1 Running 0 17m pod/my-cluster-2 1/1 Running 0 17m pod/zookeeper-0 1/1 Running 0 18m pod/zookeeper-1 1/1 Running 0 18m pod/zookeeper-2 1/1 Running 0 18m NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE service/confluent-operator ClusterIP 10.52.13.164 <none> 7778/TCP 49m service/my-cluster ClusterIP None <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-0-internal ClusterIP 10.52.2.242 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-1-internal ClusterIP 10.52.7.98 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/my-cluster-2-internal ClusterIP 10.52.4.226 <none> 9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP 17m service/zookeeper ClusterIP None <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-0-internal ClusterIP 10.52.8.52 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-1-internal ClusterIP 10.52.12.44 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m service/zookeeper-2-internal ClusterIP 10.52.12.134 <none> 2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP 18m NAME READY AGE statefulset.apps/my-cluster 3/3 17m statefulset.apps/zookeeper 3/3 18m NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/confluent-operator 1/1 1 1 49m NAME MIN AVAILABLE MAX UNAVAILABLE ALLOWED DISRUPTIONS AGE poddisruptionbudget.policy/my-cluster N/A 1 1 17m poddisruptionbudget.policy/zookeeper N/A 1 1 18m
O operador cria os seguintes recursos:
- Dois StatefulSets para Kafka e ZooKeeper.
- Três pods para réplicas de agente Kafka.
- Três pods para réplicas do ZooKeeper.
- Dois recursos
PodDisruptionBudget
, garantindo um máximo de uma réplica indisponível para a consistência do cluster. - O serviço
my-cluster
que serve como servidor de arranque para clientes Kafka que se ligam a partir do cluster Kubernetes. Todos os ouvintes internos do Kafka estão disponíveis neste serviço. - O serviço
zookeeper
que permite que os agentes Kafka se liguem aos nós ZooKeeper como clientes.
Autenticação e gestão de utilizadores
Esta secção mostra como ativar a autenticação e a autorização para proteger os ouvintes do Kafka e partilhar credenciais com os clientes.
O Confluent for Kubernetes suporta vários métodos de autenticação para o Kafka, como:
- Autenticação SASL/PLAIN: os clientes usam um nome de utilizador e uma palavra-passe para autenticação. O nome de utilizador e a palavra-passe são armazenados do lado do servidor num segredo do Kubernetes.
- SASL/PLAIN com autenticação LDAP: Os clientes usam um nome de utilizador e uma palavra-passe para autenticação. As credenciais são armazenadas num servidor LDAP.
- Autenticação mTLS: Os clientes usam certificados TLS para autenticação.
Limitações
- O CFK não fornece recursos personalizados para a gestão de utilizadores. No entanto, pode armazenar credenciais em secrets e consultar secrets nas especificações do ouvinte.
- Embora não exista um recurso personalizado para gerir as ACLs diretamente, o Confluent para Kubernetes oficial fornece orientações sobre a configuração de ACLs através da CLI do Kafka.
Crie um utilizador
Esta secção mostra como implementar um operador CFK que demonstra as capacidades de gestão de utilizadores, incluindo:
- Um cluster Kafka com autenticação baseada em palavra-passe (SASL/PLAIN) ativada num dos ouvintes
- Uma
KafkaTopic
com 3 réplicas - Credenciais de utilizador com autorizações de leitura e escrita
Crie um segredo com credenciais do utilizador:
export USERNAME=my-user export PASSWORD=$(openssl rand -base64 12) kubectl create secret generic my-user-credentials -n kafka \ --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
As credenciais devem ser armazenadas no seguinte formato:
{ "username1": "password1", "username2": "password2", ... "usernameN": "passwordN" }
Configure o cluster Kafka para usar um ouvinte com autenticação SCRAM-SHA-512 baseada em palavra-passe na porta 9094:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
Configure um tópico e um pod de cliente para interagir com o cluster Kafka e executar comandos Kafka:
kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
O GKE monta o segredo
my-user-credentials
no pod do cliente como um volume.Quando o pod do cliente estiver pronto, estabeleça ligação ao mesmo e comece a produzir e a consumir mensagens através das credenciais fornecidas:
kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka kubectl exec -it kafkacat -n kafka -- /bin/sh
Produza uma mensagem com as credenciais da
my-user
e, em seguida, consuma a mensagem para verificar a respetiva receção.export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2) export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4) echo "Message from my-user" |kcat \ -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -P kcat -b my-cluster.kafka.svc.cluster.local:9094 \ -X security.protocol=SASL_SSL \ -X sasl.mechanisms=PLAIN \ -X sasl.username=$USERNAME \ -X sasl.password=$PASSWORD \ -t my-topic -C
O resultado é semelhante ao seguinte:
Message from my-user % Reached end of topic my-topic [1] at offset 1 % Reached end of topic my-topic [2] at offset 0 % Reached end of topic my-topic [0] at offset 0
Escreva
CTRL+C
para parar o processo do consumidor. Se receber um erroConnect refused
, aguarde alguns minutos e, em seguida, tente novamente.Saia da shell do Pod
exit
Cópias de segurança e recuperação de desastres
Com o operador Confluent, pode implementar estratégias de cópia de segurança eficientes seguindo determinados padrões.
Pode usar a cópia de segurança do GKE para fazer uma cópia de segurança do seguinte:
- Manifestos de recursos do Kubernetes.
- Recursos personalizados da API Confluent e respetivas definições extraídas do servidor da API Kubernetes do cluster em fase de cópia de segurança.
- Volumes que correspondem aos recursos PersistentVolumeClaim encontrados nos manifestos.
Para mais informações sobre como fazer uma cópia de segurança e restaurar clusters do Kafka através da cópia de segurança para o GKE, consulte o artigo Prepare-se para a recuperação de desastres.
Também pode fazer uma cópia de segurança manual do cluster Kafka. Deve fazer uma cópia de segurança:
- A configuração do Kafka, que inclui todos os recursos personalizados da API Confluent, como
KafkaTopics
ouConnect
- Os dados, que são armazenados nos PersistentVolumes dos agentes Kafka
O armazenamento de manifestos de recursos do Kubernetes, incluindo configurações do Confluent, em repositórios Git pode eliminar a necessidade de uma cópia de segurança separada para a configuração do Kafka, uma vez que os recursos podem ser reaplicados a um novo cluster do Kubernetes quando necessário.
Para salvaguardar a recuperação de dados do Kafka em cenários em que uma instância do servidor Kafka ou um cluster do Kubernetes onde o Kafka está implementado é perdido, recomendamos que configure a classe de armazenamento do Kubernetes usada para aprovisionar volumes para agentes do Kafka com a opção reclaimPolicy
definida como Retain
. Também recomendamos que tire
capturas de ecrã
dos volumes do agente Kafka.
O manifesto seguinte descreve uma StorageClass que usa a opção reclaimPolicy
Retain
:
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer
O exemplo seguinte mostra a StorageClass adicionada ao spec
de um recurso personalizado do cluster Kafka:
...
spec:
...
dataVolumeCapacity: 100Gi
storageClass:
name: premium-rwo-retain
Com esta configuração, os PersistentVolumes aprovisionados através da classe de armazenamento não são eliminados, mesmo quando o PersistentVolumeClaim correspondente é eliminado.
Para recuperar a instância do Kafka num novo cluster do Kubernetes com a configuração existente e os dados da instância do agente:
- Aplique os recursos personalizados da Confluent existentes (
Kafka
,KafkaTopic
,Zookeeper
, etc.) a um novo cluster do Kubernetes - Atualize os PersistentVolumeClaims com o nome das novas instâncias do agente Kafka para os PersistentVolumes antigos através da propriedade
spec.volumeName
no PersistentVolumeClaim.