Implemente o Apache Kafka no GKE com o Confluent

O guia mostra como usar o operador Confluent for Kubernetes (CFK) para implementar clusters do Apache Kafka no Google Kubernetes Engine (GKE).

O Kafka é um sistema de mensagens de publicação/subscrição distribuído e de código aberto para processar dados de streaming em tempo real, de alto volume e de alto débito. Pode usar o Kafka para criar pipelines de dados de streaming que movem dados de forma fiável em diferentes sistemas e aplicações para processamento e análise.

Este guia destina-se a administradores de plataformas, arquitetos da nuvem e profissionais de operações interessados na implementação de clusters Kafka no GKE.

Também pode usar o operador CFK para implementar outros componentes da Confluent Platform, como o centro de controlo da Confluent baseado na Web, o registo de esquemas ou o KsqlDB. No entanto, este guia foca-se apenas nas implementações do Kafka.

Prepare o ambiente

Neste tutorial, vai usar o Cloud Shell para gerir recursos alojados no Google Cloud. O Cloud Shell está pré-instalado com o software necessário para este tutorial, incluindo kubectl, a CLI gcloud, Helm> e Terraform.

Para configurar o seu ambiente com o Cloud Shell, siga estes passos:

  1. Inicie uma sessão do Cloud Shell a partir da Google Cloud consola, clicando em Ícone de ativação do Cloud Shell Ativar Cloud Shell na Google Cloud consola. Esta ação inicia uma sessão no painel inferior da Google Cloud consola.

  2. Defina variáveis de ambiente:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    Substitua PROJECT_ID: your Google Cloud pelo seu ID do projeto.

  3. Clone o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Mude para o diretório de trabalho:

    cd kubernetes-engine-samples/streaming
    

Crie a infraestrutura do cluster

Nesta secção, executa um script do Terraform para criar um cluster do GKE privado, de alta disponibilidade e regional. Os passos seguintes permitem o acesso público ao plano de controlo. Para restringir o acesso, crie um cluster privado.

Pode instalar o operador através de um cluster padrão ou do Autopilot.

Standard

O diagrama seguinte mostra um cluster GKE padrão regional privado implementado em três zonas diferentes:

Para implementar esta infraestrutura, execute os seguintes comandos a partir do Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando lhe for pedido, escreva yes. Este comando pode demorar vários minutos a ser concluído e o cluster a apresentar o estado pronto.

O Terraform cria os seguintes recursos:

  • Uma rede VPC e uma sub-rede privada para os nós do Kubernetes.
  • Um router para aceder à Internet através de NAT.
  • Um cluster do GKE privado na região us-central1.
  • 2 conjuntos de nós com o dimensionamento automático ativado (1 a 2 nós por zona, mínimo de 1 nó por zona)
  • Um ServiceAccount com autorizações de registo e monitorização.
  • Cópia de segurança do GKE para recuperação de desastres.
  • Google Cloud Managed Service for Prometheus para monitorização de clusters.

O resultado é semelhante ao seguinte:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Piloto automático

O diagrama seguinte mostra um cluster do GKE Autopilot regional privado:

Para implementar a infraestrutura, execute os seguintes comandos a partir do Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando lhe for pedido, escreva yes. Este comando pode demorar vários minutos a ser concluído e o cluster a apresentar o estado pronto.

O Terraform cria os seguintes recursos:

  • Rede VPC e sub-rede privada para os nós do Kubernetes.
  • Um router para aceder à Internet através de NAT.
  • Um cluster do GKE privado na região us-central1.
  • Um ServiceAccount com autorizações de registo e monitorização
  • Google Cloud Managed Service for Prometheus para monitorização de clusters.

O resultado é semelhante ao seguinte:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Estabeleça ligação ao cluster

Configure o kubectl para comunicar com o cluster:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

Implemente o operador CFK no seu cluster

Nesta secção, implementa o operador Confluent for Kubernetes (CFK) através de um gráfico Helm e, em seguida, implementa um cluster Kafka.

  1. Adicione o repositório do gráfico Helm da Confluent:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. Adicione um espaço de nomes para o operador CFK e o cluster Kafka:

    kubectl create ns kafka
    
  3. Implemente o operador do cluster do CFK através do Helm:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    Para permitir que o CFK faça a gestão de recursos em todos os espaços de nomes, adicione o parâmetro --set-namespaced=false ao comando Helm.

  4. Verifique se o operador Confluent foi implementado com êxito usando o Helm:

    helm ls -n kafka
    

    O resultado é semelhante ao seguinte:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

Implemente o Kafka

Nesta secção, implementa o Kafka numa configuração básica e, em seguida, experimenta vários cenários de configuração avançada para resolver os requisitos de disponibilidade, segurança e observabilidade.

Configuração básica

A configuração básica da instância do Kafka inclui os seguintes componentes:

  • Três réplicas de agentes Kafka, com um mínimo de duas réplicas disponíveis necessárias para a consistência do cluster.
  • Três réplicas de nós do ZooKeeper, que formam um cluster.
  • Dois ouvintes do Kafka: um sem autenticação e outro que usa a autenticação TLS com um certificado gerado pelo CFK.
  • MaxHeapSize e MinHeapSize do Java definidos como 4 GB para o Kafka.
  • A atribuição de recursos da CPU de 1 pedido de CPU e 2 limites de CPU, e 5 GB de pedidos e limites de memória para o Kafka (4 GB para o serviço principal e 0,5 GB para o exportador de métricas) e 3 GB para o Zookeeper (2 GB para o serviço principal e 0,5 GB para o exportador de métricas).
  • 100 GB de armazenamento atribuído a cada Pod através da premium-rwostorageClass, 100 para dados do Kafka e 90/10 para dados/registo do Zookeeper.
  • Tolerâncias, nodeAffinities e podAntiAffinities configurados para cada carga de trabalho, garantindo a distribuição adequada pelos nós, usando os respetivos node pools e zonas diferentes.
  • Comunicação no cluster protegida por certificados autoassinados através de uma autoridade de certificação que fornece.

Esta configuração representa a configuração mínima necessária para criar um cluster Kafka pronto para produção. As secções seguintes demonstram configurações personalizadas para abordar aspetos como a segurança do cluster, as listas de controlo de acesso (ACLs), a gestão de tópicos, a gestão de certificados e muito mais.

Crie um cluster Kafka básico

  1. Gere um par de CA:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    O Confluent for Kubernetes fornece certificados gerados automaticamente para os componentes da Confluent Platform para utilização na encriptação de rede TLS. Tem de gerar e fornecer uma autoridade de certificação (AC).

  2. Crie um segredo do Kubernetes para a autoridade de certificação:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    O nome do segredo é predefinido

  3. Crie um novo cluster do Kafka com a configuração básica:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    Este comando cria um recurso personalizado do Kafka e um recurso personalizado do Zookeeper do operador CFK que inclui pedidos e limites de CPU e memória, pedidos de armazenamento de blocos e contaminações e afinidades para distribuir os pods aprovisionados pelos nós do Kubernetes.

  4. Aguarde alguns minutos enquanto o Kubernetes inicia as cargas de trabalho necessárias:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. Verifique se as cargas de trabalho do Kafka foram criadas:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    O resultado é semelhante ao seguinte:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

O operador cria os seguintes recursos:

  • Dois StatefulSets para Kafka e ZooKeeper.
  • Três pods para réplicas de agente Kafka.
  • Três pods para réplicas do ZooKeeper.
  • Dois recursos PodDisruptionBudget, garantindo um máximo de uma réplica indisponível para a consistência do cluster.
  • O serviço my-cluster que serve como servidor de arranque para clientes Kafka que se ligam a partir do cluster Kubernetes. Todos os ouvintes internos do Kafka estão disponíveis neste serviço.
  • O serviço zookeeper que permite que os agentes Kafka se liguem aos nós ZooKeeper como clientes.

Autenticação e gestão de utilizadores

Esta secção mostra como ativar a autenticação e a autorização para proteger os ouvintes do Kafka e partilhar credenciais com os clientes.

O Confluent for Kubernetes suporta vários métodos de autenticação para o Kafka, como:

  • Autenticação SASL/PLAIN: os clientes usam um nome de utilizador e uma palavra-passe para autenticação. O nome de utilizador e a palavra-passe são armazenados do lado do servidor num segredo do Kubernetes.
  • SASL/PLAIN com autenticação LDAP: Os clientes usam um nome de utilizador e uma palavra-passe para autenticação. As credenciais são armazenadas num servidor LDAP.
  • Autenticação mTLS: Os clientes usam certificados TLS para autenticação.

Limitações

  • O CFK não fornece recursos personalizados para a gestão de utilizadores. No entanto, pode armazenar credenciais em secrets e consultar secrets nas especificações do ouvinte.
  • Embora não exista um recurso personalizado para gerir as ACLs diretamente, o Confluent para Kubernetes oficial fornece orientações sobre a configuração de ACLs através da CLI do Kafka.

Crie um utilizador

Esta secção mostra como implementar um operador CFK que demonstra as capacidades de gestão de utilizadores, incluindo:

  • Um cluster Kafka com autenticação baseada em palavra-passe (SASL/PLAIN) ativada num dos ouvintes
  • Uma KafkaTopic com 3 réplicas
  • Credenciais de utilizador com autorizações de leitura e escrita
  1. Crie um segredo com credenciais do utilizador:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    As credenciais devem ser armazenadas no seguinte formato:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. Configure o cluster Kafka para usar um ouvinte com autenticação SCRAM-SHA-512 baseada em palavra-passe na porta 9094:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. Configure um tópico e um pod de cliente para interagir com o cluster Kafka e executar comandos Kafka:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    O GKE monta o segredo my-user-credentials no pod do cliente como um volume.

  4. Quando o pod do cliente estiver pronto, estabeleça ligação ao mesmo e comece a produzir e a consumir mensagens através das credenciais fornecidas:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. Produza uma mensagem com as credenciais da my-user e, em seguida, consuma a mensagem para verificar a respetiva receção.

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    O resultado é semelhante ao seguinte:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    Escreva CTRL+C para parar o processo do consumidor. Se receber um erro Connect refused , aguarde alguns minutos e, em seguida, tente novamente.

  6. Saia da shell do Pod

    exit
    

Cópias de segurança e recuperação de desastres

Com o operador Confluent, pode implementar estratégias de cópia de segurança eficientes seguindo determinados padrões.

Pode usar a cópia de segurança do GKE para fazer uma cópia de segurança do seguinte:

  • Manifestos de recursos do Kubernetes.
  • Recursos personalizados da API Confluent e respetivas definições extraídas do servidor da API Kubernetes do cluster em fase de cópia de segurança.
  • Volumes que correspondem aos recursos PersistentVolumeClaim encontrados nos manifestos.

Para mais informações sobre como fazer uma cópia de segurança e restaurar clusters do Kafka através da cópia de segurança para o GKE, consulte o artigo Prepare-se para a recuperação de desastres.

Também pode fazer uma cópia de segurança manual do cluster Kafka. Deve fazer uma cópia de segurança:

  • A configuração do Kafka, que inclui todos os recursos personalizados da API Confluent, como KafkaTopicsouConnect
  • Os dados, que são armazenados nos PersistentVolumes dos agentes Kafka

O armazenamento de manifestos de recursos do Kubernetes, incluindo configurações do Confluent, em repositórios Git pode eliminar a necessidade de uma cópia de segurança separada para a configuração do Kafka, uma vez que os recursos podem ser reaplicados a um novo cluster do Kubernetes quando necessário.

Para salvaguardar a recuperação de dados do Kafka em cenários em que uma instância do servidor Kafka ou um cluster do Kubernetes onde o Kafka está implementado é perdido, recomendamos que configure a classe de armazenamento do Kubernetes usada para aprovisionar volumes para agentes do Kafka com a opção reclaimPolicy definida como Retain. Também recomendamos que tire capturas de ecrã dos volumes do agente Kafka.

O manifesto seguinte descreve uma StorageClass que usa a opção reclaimPolicy Retain:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

O exemplo seguinte mostra a StorageClass adicionada ao spec de um recurso personalizado do cluster Kafka:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

Com esta configuração, os PersistentVolumes aprovisionados através da classe de armazenamento não são eliminados, mesmo quando o PersistentVolumeClaim correspondente é eliminado.

Para recuperar a instância do Kafka num novo cluster do Kubernetes com a configuração existente e os dados da instância do agente:

  1. Aplique os recursos personalizados da Confluent existentes (Kafka, KafkaTopic, Zookeeper, etc.) a um novo cluster do Kubernetes
  2. Atualize os PersistentVolumeClaims com o nome das novas instâncias do agente Kafka para os PersistentVolumes antigos através da propriedade spec.volumeName no PersistentVolumeClaim.