Implantar o Apache Kafka no GKE usando o Confluent

O guia mostra como usar o operador do Confluent para Kubernetes (CFK) ao implantar clusters do Apache Kafka no Google Kubernetes Engine (GKE).

O Kafka é um sistema de mensagens de publicação/assinatura distribuído e de código aberto que ajuda você a lidar com dados de streaming de alto volume, de alta capacidade de processamento e em tempo real. É possível usar o Kafka para criar pipelines de dados de streaming a fim de mover dados de maneira confiável entre diferentes sistemas e aplicativos para processamento e análise.

Este guia é destinado a administradores de plataformas, arquitetos de nuvem e profissionais de operações interessados em implantar clusters do Kafka no GKE.

Também é possível usar o operador do CFK para implantar outros componentes da plataforma Confluent, como o centro de controle do Confluent baseado na Web, o Schema Registry ou o KsqlDB. No entanto, este guia foca apenas nas implantações do Kafka.

Prepare o ambiente

Neste tutorial, você vai usar o Cloud Shell para gerenciar recursos hospedados no Google Cloud. O Cloud Shell vem pré-instalado com o software necessário para este tutorial, incluindo kubectl, a CLI gcloud, o Helm e o Terraform.

Para configurar o ambiente com o Cloud Shell, siga estas etapas:

  1. Inicie uma sessão do Cloud Shell no console do Google Cloud clicando em Ícone de ativação do Cloud Shell Ativar o Cloud Shell no console doGoogle Cloud . Isso inicia uma sessão no painel inferior do console Google Cloud .

  2. Defina as variáveis de ambiente:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    Substitua PROJECT_ID: seu Google Cloud pelo ID do projeto.

  3. Clone o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Mude para o diretório de trabalho:

    cd kubernetes-engine-samples/streaming
    

Criar a infraestrutura do cluster

Nesta seção, você executa um script do Terraform para criar um cluster regional do GKE privado e altamente disponível. As etapas a seguir permitem acesso público ao plano de controle. Para restringir o acesso, crie um cluster particular.

É possível instalar o operador usando um cluster padrão ou Autopilot.

Padrão

O diagrama a seguir mostra um cluster regional padrão particular do GKE implantado em três zonas diferentes:

Para implantar essa infraestrutura, execute os seguintes comandos no Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando solicitado, digite yes. Pode levar vários minutos para que esse comando seja concluído e o cluster mostre um status pronto.

O Terraform cria os seguintes recursos:

  • Uma rede VPC e uma sub-rede particular para os nós do Kubernetes.
  • Um roteador para acessar a Internet por meio de NAT.
  • Um cluster particular do GKE na região us-central1.
  • 2 pools de nós com escalonamento automático ativado (de 1 a 2 nós por zona, no mínimo 1 nó por zona)
  • Um ServiceAccount com permissões de geração de registros e monitoramento.
  • Backup do GKE para recuperação de desastres.
  • Google Cloud Managed Service para Prometheus para monitoramento de clusters.

O resultado será assim:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Piloto automático

O diagrama a seguir mostra um cluster particular regional do Autopilot do GKE:

Para implantar a infraestrutura, execute os seguintes comandos do Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando solicitado, digite yes. Pode levar vários minutos para que esse comando seja concluído e o cluster mostre um status pronto.

O Terraform cria os seguintes recursos:

  • Rede VPC e sub-rede privada para os nós do Kubernetes.
  • Um roteador para acessar a Internet por meio de NAT.
  • Um cluster particular do GKE na região us-central1.
  • Um ServiceAccount com permissões de registro e monitoramento
  • Google Cloud Managed Service para Prometheus para monitoramento de clusters.

O resultado será assim:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials kafka-cluster --region us-central1"

Conexão ao cluster

Configure kubectl para se comunicar com o cluster:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

Implantar o operador do CFK no cluster

Nesta seção, você implantará o operador do Confluent para Kubernetes (CFK) usando um gráfico do Helm e, em seguida, implantará um cluster do Kafka.

  1. Adicione o repositório de gráficos do Helm do Confluent:

    helm repo add confluentinc https://packages.confluent.io/helm
    
  2. Adicione um namespace para o operador do CFK e o cluster do Kafka:

    kubectl create ns kafka
    
  3. Implante o operador de cluster do CFK usando o Helm:

    helm install confluent-operator confluentinc/confluent-for-kubernetes -n kafka
    

    Para permitir que o CFK gerencie os recursos em todos os namespaces, adicione o parâmetro --set-namespaced=false ao comando do Helm.

  4. Verifique se o operador do Confluent foi implantado com sucesso usando o Helm:

    helm ls -n kafka
    

    O resultado será assim:

    NAME                  NAMESPACE  REVISION UPDATED                                  STATUS      CHART                                APP VERSION
    confluent-operator    kafka      1        2023-07-07 10:57:45.409158 +0200 CEST    deployed    confluent-for-kubernetes-0.771.13    2.6.0
    

Implantar o Kafka

Nesta seção, você implantará o Kafka em uma configuração básica e, em seguida, testará vários cenários de configuração avançada para atender aos requisitos de disponibilidade, segurança e observabilidade.

Configuração básica

A configuração básica da instância do Kafka inclui os seguintes componentes:

  • Três réplicas de agentes do Kafka, com um mínimo de duas réplicas disponíveis necessárias para a consistência do cluster.
  • Três réplicas de nós do ZooKeeper, formando um cluster.
  • Dois listeners do Kafka: um sem autenticação e outro que usa a autenticação TLS com um certificado gerado pelo CFK.
  • MaxHeapSize e MinHeapSize de Java definidos como 4 GB para o Kafka.
  • Alocação de recursos de CPU de uma solicitação de CPU e dois limites de CPU, 5 GB de limites e solicitações de memória para o Kafka (4 GB para o serviço principal e 0,5 GB para o exportador de métricas) e 3 GB para o Zookeeper (2 GB para o serviço principal e 0,5 GB para o exportador de métricas).
  • 100 GB de armazenamento alocado para cada pod usando a storageClass premium-rwo, 100 para dados do Kafka e 90/10 para dados/registros do Zookeeper.
  • Tolerâncias, nodeAffinities e podAntiAffinities configurados para cada carga de trabalho, garantindo a distribuição apropriada entre nós, utilizando os respectivos pools de nós e zonas diferentes.
  • Comunicação dentro do cluster protegida por certificados autoassinados usando uma autoridade de certificação fornecida por você.

Essa configuração representa a configuração mínima necessária para criar um cluster do Kafka pronto para produção. As seções a seguir demonstram configurações personalizadas para abordar aspectos como a segurança do cluster, as listas de controle de acesso (ACLs), o gerenciamento de tópicos, o gerenciamento de certificados e muito mais.

Criar um cluster básico do Kafka

  1. Gere um par de ACs:

    openssl genrsa -out ca-key.pem 2048
    openssl req -new -key ca-key.pem -x509 \
      -days 1000 \
      -out ca.pem \
      -subj "/C=US/ST=CA/L=Confluent/O=Confluent/OU=Operator/CN=MyCA"
    

    O Confluent para Kubernetes fornece certificados gerados automaticamente para componentes da plataforma do Confluent a serem usados na criptografia de rede TLS. É preciso gerar e fornecer uma autoridade certificadora (AC).

  2. Crie um secret do Kubernetes para a autoridade certificadora:

    kubectl create secret tls ca-pair-sslcerts --cert=ca.pem --key=ca-key.pem -n kafka
    

    O nome do secret é predefinido.

  3. Crie um novo cluster do Kafka usando a configuração básica:

    kubectl apply -n kafka -f kafka-confluent/manifests/01-basic-cluster/my-cluster.yaml
    

    Esse comando cria um recurso personalizado do Kafka e um do Zookeeper para o operador do CFK que inclui solicitações e limites de CPU e memória, solicitações de armazenamento em blocos, taints e afinidades para distribuir os pods provisionados nos nós do Kubernetes.

  4. Aguarde alguns minutos enquanto o Kubernetes inicia as cargas de trabalho necessárias:

    kubectl wait pods -l app=my-cluster --for condition=Ready --timeout=300s -n kafka
    
  5. Verifique se as cargas de trabalho do Kafka foram criadas:

    kubectl get pod,svc,statefulset,deploy,pdb -n kafka
    

    O resultado será assim:

    NAME                                    READY   STATUS  RESTARTS   AGE
    pod/confluent-operator-864c74d4b4-fvpxs   1/1   Running   0        49m
    pod/my-cluster-0                        1/1   Running   0        17m
    pod/my-cluster-1                        1/1   Running   0        17m
    pod/my-cluster-2                        1/1   Running   0        17m
    pod/zookeeper-0                         1/1   Running   0        18m
    pod/zookeeper-1                         1/1   Running   0        18m
    pod/zookeeper-2                         1/1   Running   0        18m
    
    NAME                          TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                                                        AGE
    service/confluent-operator    ClusterIP   10.52.13.164   <none>      7778/TCP                                                       49m
    service/my-cluster            ClusterIP   None         <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-0-internal   ClusterIP   10.52.2.242  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-1-internal   ClusterIP   10.52.7.98   <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/my-cluster-2-internal   ClusterIP   10.52.4.226  <none>      9092/TCP,8090/TCP,9071/TCP,7203/TCP,7777/TCP,7778/TCP,9072/TCP   17m
    service/zookeeper             ClusterIP   None         <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-0-internal  ClusterIP   10.52.8.52   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-1-internal  ClusterIP   10.52.12.44  <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    service/zookeeper-2-internal  ClusterIP   10.52.12.134   <none>      2181/TCP,7203/TCP,7777/TCP,3888/TCP,2888/TCP,7778/TCP          18m
    
    NAME                        READY   AGE
    statefulset.apps/my-cluster   3/3   17m
    statefulset.apps/zookeeper  3/3   18m
    
    NAME                               READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/confluent-operator   1/1   1          1         49m
    
    NAME                                  MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster   N/A           1               1                   17m
    poddisruptionbudget.policy/zookeeper  N/A           1               1                   18m
    

O operador cria os seguintes recursos:

  • Dois StatefulSets para o Kafka e o ZooKeeper.
  • Três pods para réplicas do agente do Kafka.
  • Três pods para réplicas do ZooKeeper.
  • Dois recursos PodDisruptionBudget, a fim de garantir no máximo uma réplica indisponível para consistência do cluster.
  • O serviço my-cluster que atua como o servidor de inicialização para os clientes do Kafka que se conectam de dentro do cluster do Kubernetes. Todos os listeners internos do Kafka estão disponíveis neste serviço.
  • O serviço zookeeper, que permite que os agentes do Kafka se conectem aos nós do ZooKeeper como clientes.

Autenticação e gerenciamento de usuários

Esta seção mostra como ativar a autenticação e autorização para proteger os listeners do Kafka e compartilhar credenciais com os clientes.

O Confluent para Kubernetes oferece suporte a vários métodos de autenticação para Kafka, como os seguintes:

  • Autenticação SASL/PLAIN: os clientes usam um nome de usuário e uma senha para fazer a autenticação. O nome de usuário e a senha são armazenados no servidor em um secret do Kubernetes.
  • SASL/PLAIN com autenticação LDAP: os clientes usam um nome de usuário e uma senha para fazer a autenticação. As credenciais são armazenadas em um servidor LDAP.
  • Autenticação mTLS: os clientes usam certificados TLS para fazer a autenticação.

Limitações

  • O CFK não oferece recursos personalizados para o gerenciamento de usuários. No entanto, é possível armazenar credenciais em secrets e consultá-los nas especificações do listener.
  • Embora não haja um recurso personalizado para gerenciar as ACLs diretamente, o Confluent para Kubernetes oficial fornece orientações sobre como configurar ACLs usando a CLI do Kafka.

Criar um usuário

Esta seção mostra como implantar um operador do CFK que demonstra os recursos de gerenciamento de usuários, incluindo:

  • Um cluster do Kafka com autenticação baseada em senha (SASL/PLAIN) ativada em um dos listeners
  • Um KafkaTopic com três réplicas
  • Credenciais de usuário com permissões de leitura e gravação
  1. Crie um secret com as credenciais de usuário:

    export USERNAME=my-user
    export PASSWORD=$(openssl rand -base64 12)
    kubectl create secret generic my-user-credentials -n kafka \
      --from-literal=plain-users.json="{\"$USERNAME\":\"$PASSWORD\"}"
    

    As credenciais precisam ser armazenadas no seguinte formato:

    {
    "username1": "password1",
    "username2": "password2",
    ...
    "usernameN": "passwordN"
    }
    
  2. Configure o cluster do Kafka para usar um listener com a autenticação SCRAM-SHA-512 baseada em senha na porta 9094:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-cluster.yaml
    
  3. Configure um tópico e um pod cliente para interagir com o cluster do Kafka e executar os comandos do Kafka:

    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/my-topic.yaml
    kubectl apply -n kafka -f kafka-confluent/manifests/02-auth/kafkacat.yaml
    

    O GKE monta o secret my-user-credentials no pod cliente como um volume.

  4. Quando o pod cliente estiver pronto, conecte-se a ele e comece a produzir e consumir mensagens usando as credenciais fornecidas:

    kubectl wait pod kafkacat --for=condition=Ready --timeout=300s -n kafka
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. Produza uma mensagem usando as credenciais my-user e, em seguida, consuma essa mensagem para verificar o recebimento.

    export USERNAME=$(cat /my-user/plain-users.json|cut -d'"' -f 2)
    export PASSWORD=$(cat /my-user/plain-users.json|cut -d'"' -f 4)
    echo "Message from my-user" |kcat \
      -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -P
    kcat -b my-cluster.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=PLAIN \
      -X sasl.username=$USERNAME \
      -X sasl.password=$PASSWORD  \
      -t my-topic -C
    

    O resultado será assim:

    Message from my-user
    % Reached end of topic my-topic [1] at offset 1
    % Reached end of topic my-topic [2] at offset 0
    % Reached end of topic my-topic [0] at offset 0
    

    Digite CTRL+C para interromper o processo do consumidor. Se você receber um erro Connect refused, aguarde alguns minutos e tente novamente.

  6. Saia do shell do pod

    exit
    

Backups e recuperação de desastres

Com o operador do Confluent, é possível implementar estratégias de backup eficientes seguindo determinados padrões.

Use o Backup para GKE para fazer backup:

  • Manifestos do recurso do Kubernetes.
  • Recursos personalizados da API Confluent e definições relacionadas extraídos do servidor de API do Kubernetes do cluster que está passando pelo backup.
  • Volumes correspondentes aos recursos PersistentVolumeClaim encontrados nos manifestos.

Para mais informações sobre como fazer backup e restaurar clusters do Kafka usando o Backup para GKE, consulte Preparar-se para a recuperação de desastres.

Também é possível fazer um backup manual do cluster do Kafka. Faça o backup:

  • A configuração do Kafka, que inclui todos os recursos personalizados da API Confluent, como KafkaTopics ou Connect
  • Os dados, armazenados nos PersistentVolumes dos agentes do Kafka

Armazenar manifestos de recursos do Kubernetes, incluindo as configurações do Confluent, em repositórios Git pode eliminar a necessidade de um backup separado para a configuração do Kafka, porque os recursos podem ser reaplicados a um novo cluster do Kubernetes quando necessário.

Para proteger a recuperação de dados do Kafka em cenários em que uma instância de servidor do Kafka ou um cluster do Kubernetes em que o Kafka está implantado é perdido, recomendamos configurar a classe de armazenamento do Kubernetes usada para provisionar volumes aos agentes do Kafka com a opção reclaimPolicy definida como Retain. Também recomendamos realizar snapshots dos volumes do agente do Kafka.

O manifesto a seguir descreve um StorageClass que usa a opção reclaimPolicy Retain:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

O exemplo a seguir mostra o StorageClass adicionado ao spec de um recurso personalizado do cluster do Kafka:

...
spec:
  ...
  dataVolumeCapacity: 100Gi
  storageClass:
  name: premium-rwo-retain

Com essa configuração, os PersistentVolumes provisionados usando a classe de armazenamento não são excluídos, mesmo quando o PersistentVolumeClaim correspondente é excluído.

Para recuperar a instância do Kafka em um novo cluster do Kubernetes usando a configuração atual e os dados da instância do agente:

  1. Aplicar os recursos personalizados atuais do Confluent (Kafka, KafkaTopic, Zookeeper etc.) a um novo cluster do Kubernetes
  2. Atualize os PersistentVolumeClaims com o nome das novas instâncias do agente do Kafka para os PersistentVolumes antigos usando a propriedade spec.volumeName no PersistentVolumeClaim.