Implemente o Apache Kafka no GKE com o Strimzi

O guia mostra como usar o operador Strimzi para implementar clusters do Apache Kafka.

O Kafka é um sistema de mensagens distribuído de código aberto concebido para processar dados de streaming de alto volume, alto débito e em tempo real. Permite-lhe criar pipelines de dados de streaming para uma transferência de dados fiável em diferentes sistemas e aplicações, de modo a suportar tarefas de processamento e análise.

Os operadores são extensões de software que usam recursos personalizados para gerir aplicações e os respetivos componentes. Para saber mais sobre a motivação para usar operadores, consulte o padrão de operador na documentação de código aberto do Kubernetes. O operador Strimzi oferece flexibilidade nas opções de implementação e permite-lhe usar tolerâncias e contaminações do Kubernetes para executar o Kafka em nós dedicados.

Este guia destina-se a administradores de plataformas, arquitetos da nuvem e profissionais de operações interessados na implementação de clusters Kafka no GKE.

Esta solução é um bom ponto de partida se quiser saber como implementar clusters do Kafka usando um operador de terceiros para automatizar a gestão e reduzir os erros. Se preferir um controlo operacional mais detalhado, consulte o artigo Implemente clusters Kafka de elevada disponibilidade no GKE.

Prepare o ambiente

Neste tutorial, vai usar o Cloud Shell para gerir recursos alojados no Google Cloud. O Cloud Shell está pré-instalado com o software de que precisa para este tutorial, incluindo o kubectl, a CLI gcloud, o Helm e o Terraform.

Para configurar o seu ambiente com o Cloud Shell, siga estes passos:

  1. Inicie uma sessão do Cloud Shell a partir da Google Cloud consola, clicando em Ícone de ativação do Cloud Shell Ativar Cloud Shell na Google Cloud consola. Esta ação inicia uma sessão no painel inferior da Google Cloud consola.

  2. Defina variáveis de ambiente:

    export PROJECT_ID=PROJECT_ID
    export KUBERNETES_CLUSTER_PREFIX=kafka
    export REGION=us-central1
    

    Substitua PROJECT_ID: your Google Cloud pelo seu ID do projeto.

  3. Clone o repositório do GitHub:

    git clone https://github.com/GoogleCloudPlatform/kubernetes-engine-samples
    
  4. Mude para o diretório de trabalho:

    cd kubernetes-engine-samples/streaming/
    

Crie a infraestrutura do cluster

Nesta secção, executa um script do Terraform para criar um cluster do GKE privado, de alta disponibilidade e regional. Os passos seguintes permitem o acesso público ao plano de controlo. Para restringir o acesso, crie um cluster privado.

Pode instalar o operador através de um cluster padrão ou do Autopilot.

Standard

O diagrama seguinte mostra um cluster GKE padrão regional privado implementado em três zonas diferentes:

Para implementar esta infraestrutura, execute os seguintes comandos a partir do Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-standard init
terraform -chdir=kafka/terraform/gke-standard apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando lhe for pedido, escreva yes. Este comando pode demorar vários minutos a ser concluído e o cluster a apresentar o estado pronto.

O Terraform cria os seguintes recursos:

  • Uma rede VPC e uma sub-rede privada para os nós do Kubernetes.
  • Um router para aceder à Internet através de NAT.
  • Um cluster do GKE privado na região us-central1.
  • 2 conjuntos de nós com o dimensionamento automático ativado (1 a 2 nós por zona, mínimo de 1 nó por zona)
  • Um ServiceAccount com autorizações de registo e monitorização.
  • Cópia de segurança do GKE para recuperação de desastres.
  • Google Cloud Managed Service for Prometheus para monitorização de clusters.

O resultado é semelhante ao seguinte:

...
Apply complete! Resources: 14 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"

Piloto automático

O diagrama seguinte mostra um cluster do GKE Autopilot regional privado:

Para implementar a infraestrutura, execute os seguintes comandos a partir do Cloud Shell:

export GOOGLE_OAUTH_ACCESS_TOKEN=$(gcloud auth print-access-token)
terraform -chdir=kafka/terraform/gke-autopilot init
terraform -chdir=kafka/terraform/gke-autopilot apply -var project_id=${PROJECT_ID} \
  -var region=${REGION} \
  -var cluster_prefix=${KUBERNETES_CLUSTER_PREFIX}

Quando lhe for pedido, escreva yes. Este comando pode demorar vários minutos a ser concluído e o cluster a apresentar o estado pronto.

O Terraform cria os seguintes recursos:

  • Rede VPC e sub-rede privada para os nós do Kubernetes.
  • Um router para aceder à Internet através de NAT.
  • Um cluster do GKE privado na região us-central1.
  • Um ServiceAccount com autorizações de registo e monitorização
  • Google Cloud Managed Service for Prometheus para monitorização de clusters.

O resultado é semelhante ao seguinte:

...
Apply complete! Resources: 12 added, 0 changed, 0 destroyed.

Outputs:

kubectl_connection_command = "gcloud container clusters get-credentials strimzi-cluster --region us-central1"

A ligar ao cluster

Configure o kubectl para comunicar com o cluster:

gcloud container clusters get-credentials ${KUBERNETES_CLUSTER_PREFIX}-cluster --region ${REGION}

Implemente o operador Strimzi no seu cluster

Nesta secção, implementa o operador Strimzi através de um gráfico Helm. Também existem várias outras formas de implementar o Strimzi.

  1. Adicione o repositório do gráfico Helm do Strimzi:

    helm repo add strimzi https://strimzi.io/charts/
    
  2. Adicione um espaço de nomes para o operador Strimzi e o cluster Kafka:

    kubectl create ns kafka
    
  3. Implemente o operador de cluster do Strimzi através do Helm:

    helm install strimzi-operator strimzi/strimzi-kafka-operator -n kafka
    

    Para implementar o operador de cluster Strimzi e os clusters Kafka em espaços de nomes diferentes, adicione o parâmetro --set watchNamespaces="{kafka-namespace,kafka-namespace-2,...}" ao comando helm.

  4. Confirme se o operador de cluster Strimzi foi implementado com êxito usando o Helm:

    helm ls -n kafka
    

    O resultado é semelhante ao seguinte:

    NAME            NAMESPACE    REVISION    UPDATED                              STATUS    CHART                        APP VERSION
    strimzi-operator    kafka      1       2023-06-27 11:22:15.850545 +0200 CEST    deployed    strimzi-kafka-operator-0.35.0    0.35.0
    

Implemente o Kafka

Depois de o operador ser implementado no cluster, tem tudo pronto para implementar uma instância de cluster do Kafka.

Nesta secção, implementa o Kafka numa configuração básica e, em seguida, experimenta vários cenários de configuração avançada para resolver os requisitos de disponibilidade, segurança e observabilidade.

Configuração básica

A configuração básica da instância do Kafka inclui os seguintes componentes:

  • Três réplicas de agentes Kafka, com um mínimo de duas réplicas disponíveis necessárias para a consistência do cluster.
  • Três réplicas de nós do ZooKeeper, que formam um cluster.
  • Dois ouvintes do Kafka: um sem autenticação e outro que usa a autenticação TLS com um certificado gerado pelo Strimzi.
  • MaxHeapSize e MinHeapSize do Java definidos como 4 GB para o Kafka e 2 GB para o ZooKeeper.
  • Alocação de recursos da CPU de 1 pedido de CPU e 2 limites de CPU para o Kafka e o ZooKeeper, juntamente com 5 GB de pedidos e limites de memória para o Kafka (4 GB para o serviço principal e 0,5 GB para o exportador de métricas) e 2,5 GB para o ZooKeeper (2 GB para o serviço principal e 0,5 GB para o exportador de métricas).
  • Entidade-operador com os seguintes pedidos e limites:
    • : 100 m/500 m de CPU e 128 Mi de memória.tlsSidecar
    • topicOperator: 100 m/500 m de CPU e 512 Mi de memória.
    • userOperator: 500 m de CPU e 2 Gi de memória.
  • 100 GB de armazenamento atribuídos a cada Pod através da app premium-rwo storageClass.
  • Tolerâncias, nodeAffinities e podAntiAffinities configurados para cada carga de trabalho, garantindo a distribuição adequada pelos nós, usando os respetivos node pools e zonas diferentes.
  • Comunicação no interior do cluster protegida por certificados autoassinados: autoridades de certificação (ACs) separadas para o cluster e os clientes (mTLS). Também pode configurar a utilização de uma autoridade de certificação diferente.

Esta configuração representa a configuração mínima necessária para criar um cluster Kafka pronto para produção. As secções seguintes demonstram configurações personalizadas para abordar aspetos como a segurança do cluster, as listas de controlo de acesso (ACLs), a gestão de tópicos, a gestão de certificados e muito mais.

Crie um cluster Kafka básico

  1. Crie um novo cluster do Kafka com a configuração básica:

    kubectl apply -n kafka -f kafka-strimzi/manifests/01-basic-cluster/my-cluster.yaml
    

    Este comando cria um recurso personalizado do Kafka do operador Strimzi que inclui pedidos e limites de CPU e memória, pedidos de armazenamento em blocos e uma combinação de manchas e afinidades para distribuir os pods aprovisionados pelos nós do Kubernetes.

  2. Aguarde alguns minutos enquanto o Kubernetes inicia as cargas de trabalho necessárias:

    kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s -n kafka
    
  3. Verifique se as cargas de trabalho do Kafka foram criadas:

    kubectl get pod,service,deploy,pdb -l=strimzi.io/cluster=my-cluster -n kafka
    

    O resultado é semelhante ao seguinte:

    NAME                                            READY   STATUS  RESTARTS   AGE
    pod/my-cluster-entity-operator-848698874f-j5m7f   3/3   Running   0        44m
    pod/my-cluster-kafka-0                          1/1   Running   0        5m
    pod/my-cluster-kafka-1                          1/1   Running   0        5m
    pod/my-cluster-kafka-2                          1/1   Running   0        5m
    pod/my-cluster-zookeeper-0                      1/1   Running   0        6m
    pod/my-cluster-zookeeper-1                      1/1   Running   0        6m
    pod/my-cluster-zookeeper-2                      1/1   Running   0        6m
    
    NAME                                TYPE      CLUSTER-IP   EXTERNAL-IP   PORT(S)                             AGE
    service/my-cluster-kafka-bootstrap  ClusterIP   10.52.8.80   <none>      9091/TCP,9092/TCP,9093/TCP          5m
    service/my-cluster-kafka-brokers    ClusterIP   None         <none>      9090/TCP,9091/TCP,9092/TCP,9093/TCP   5m
    service/my-cluster-zookeeper-client   ClusterIP   10.52.11.144   <none>      2181/TCP                            6m
    service/my-cluster-zookeeper-nodes  ClusterIP   None         <none>      2181/TCP,2888/TCP,3888/TCP          6m
    
    NAME                                       READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/my-cluster-entity-operator   1/1   1          1         44m
    
    NAME                                            MIN AVAILABLE   MAX UNAVAILABLE   ALLOWED DISRUPTIONS   AGE
    poddisruptionbudget.policy/my-cluster-kafka     2             N/A             1                   5m
    poddisruptionbudget.policy/my-cluster-zookeeper   2             N/A             1                   6m
    

O operador cria os seguintes recursos:

  • Dois StrimziPodSets para o Kafka e o ZooKeeper.
  • Três pods para réplicas de agente Kafka.
  • Três pods para réplicas do ZooKeeper.
  • Dois PodDisruptionBudgets, garantindo uma disponibilidade mínima de duas réplicas para a consistência do cluster.
  • Um serviço denominado my-cluster-kafka-bootstrap, que funciona como o servidor de arranque para clientes Kafka que se ligam a partir do cluster Kubernetes. Todos os ouvintes internos do Kafka estão disponíveis neste serviço.
  • Um serviço sem interface gráfica denominado my-cluster-kafka-brokers que permite a resolução de DNS dos endereços IP dos pods do agente Kafka diretamente. Este serviço é usado para comunicação entre corretores.
  • Um serviço denominado my-cluster-zookeeper-client que permite que os agentes Kafka se liguem aos nós do ZooKeeper como clientes.
  • Um serviço sem interface gráfica denominado my-cluster-zookeeper-nodes que permite a resolução de DNS dos endereços IP dos pods do ZooKeeper diretamente. Este serviço é usado para estabelecer ligação entre réplicas do ZooKeeper.
  • Uma implementação denominada my-cluster-entity-operator que contém o topic-operator e o user-operator e facilita a gestão de recursos personalizados KafkaTopics e KafkaUsers.

Também pode configurar dois NetworkPolicies para facilitar a conetividade aos ouvintes do Kafka a partir de qualquer Pod e espaço de nomes. Estas políticas também restringem as ligações ao ZooKeeper a corretores e ativam a comunicação entre os pods do cluster e as portas de serviço internas exclusivas da comunicação do cluster.

Autenticação e gestão de utilizadores

Esta secção mostra como ativar a autenticação e a autorização para proteger os ouvintes do Kafka e partilhar credenciais com os clientes.

O Strimzi oferece um método nativo do Kubernetes para a gestão de utilizadores através de um User Operator separado e do respetivo recurso personalizado do Kubernetes, KafkaUser, que define a configuração do utilizador. A configuração do utilizador inclui definições para autenticação e autorização, e aprovisiona o utilizador correspondente no Kafka.

O Strimzi pode criar ouvintes e utilizadores do Kafka que suportam vários mecanismos de autenticação como a autenticação baseada em nome de utilizador e palavra-passe (SCRAM-SHA-512) ou TLS. Também pode usar a autenticação OAuth 2.0, que é frequentemente considerada uma abordagem melhor em comparação com a utilização de palavras-passe ou certificados para autenticação devido à segurança e à gestão de credenciais externas.

Implemente um cluster do Kafka

Esta secção mostra como implementar um operador Strimzi que demonstra as capacidades de gestão de utilizadores, incluindo:

  • Um cluster Kafka com autenticação baseada em palavra-passe (SCRAM-SHA-512) ativada num dos ouvintes.
  • KafkaTopic com 3 réplicas.
  • Um KafkaUser com uma LCA que especifica que o utilizador tem autorizações de leitura e escrita para o tópico.
  1. Configure o cluster Kafka para usar um ouvinte com autenticação SCRAM-SHA-512 baseada em palavra-passe na porta 9094 e autorização simples:

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-cluster.yaml
    
  2. Crie um Topic, um User e um pod cliente para executar comandos no cluster Kafka:

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/topic.yaml
    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/my-user.yaml
    

    O Secret my-user com as credenciais do utilizador é montado no pod do cliente como um volume.

    Estas credenciais confirmam que o utilizador tem autorizações para publicar mensagens no tópico através do ouvinte com a autenticação baseada em palavra-passe (SCRAM-SHA-512) ativada.

  3. Crie um pod de cliente:

    kubectl apply -n kafka -f kafka-strimzi/manifests/03-auth/kafkacat.yaml
    
  4. Aguarde alguns minutos até que o pod do cliente se torne Ready e, em seguida, estabeleça ligação ao mesmo:

    kubectl wait --for=condition=Ready pod --all -n kafka --timeout=600s
    kubectl exec -it kafkacat -n kafka -- /bin/sh
    
  5. Produza uma nova mensagem com credenciais my-user e tente consumi-la:

    echo "Message from my-user" |kcat \
      -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=SCRAM-SHA-512 \
      -X sasl.username=my-user \
      -X sasl.password=$(cat /my-user/password) \
      -t my-topic -P
    kcat -b my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9094 \
      -X security.protocol=SASL_SSL \
      -X sasl.mechanisms=SCRAM-SHA-512 \
      -X sasl.username=my-user \
      -X sasl.password=$(cat /my-user/password) \
      -t my-topic -C
    

    O resultado é semelhante ao seguinte:

    Message from my-user
    % Reached end of topic my-topic [0] at offset 0
    % Reached end of topic my-topic [2] at offset 1
    % Reached end of topic my-topic [1] at offset 0
    

    Escreva CTRL+C para parar o processo do consumidor.

  6. Saia da shell do Pod

    exit
    

Cópias de segurança e recuperação de desastres

Embora o operador Strimzi não ofereça uma funcionalidade de cópia de segurança integrada, pode implementar estratégias de cópia de segurança eficientes seguindo determinados padrões.

Pode usar a cópia de segurança do GKE para fazer uma cópia de segurança do seguinte:

  • Manifestos de recursos do Kubernetes.
  • Recursos personalizados da API Strimzi e respetivas definições extraídas do servidor da API Kubernetes do cluster em fase de cópia de segurança.
  • Volumes que correspondem aos recursos PersistentVolumeClaim encontrados nos manifestos.

Para mais informações sobre como fazer uma cópia de segurança e restaurar clusters do Kafka através da cópia de segurança para o GKE, consulte o artigo Prepare-se para a recuperação de desastres.

Também pode fazer uma cópia de segurança de um cluster Kafka implementado através do operador Strimzi. Deve fazer uma cópia de segurança:

  • A configuração do Kafka, que inclui todos os recursos personalizados da API Strimzi, como KafkaTopicse KafkaUsers.
  • Os dados, que são armazenados nos PersistentVolumes dos agentes Kafka.

O armazenamento de manifestos de recursos do Kubernetes, incluindo configurações do Strimzi, em repositórios Git pode eliminar a necessidade de uma cópia de segurança separada para a configuração do Kafka, porque os recursos podem ser reaplicados a um novo cluster do Kubernetes quando necessário.

Para salvaguardar a recuperação de dados do Kafka em cenários em que uma instância do servidor Kafka ou um cluster do Kubernetes onde o Kafka está implementado é perdido, recomendamos que configure a classe de armazenamento do Kubernetes usada para o aprovisionamento de volumes para agentes Kafka com a opção reclaimPolicy definida como Retain. Também recomendamos que tire capturas de ecrã dos volumes do agente Kafka.

O manifesto seguinte descreve uma StorageClass que usa a opção reclaimPolicy Retain:

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: premium-rwo-retain
...
reclaimPolicy: Retain
volumeBindingMode: WaitForFirstConsumer

O exemplo seguinte mostra a StorageClass adicionada ao spec de um recurso personalizado do cluster Kafka:

# ...
spec:
  kafka:
    # ...
    storage:
      type: persistent-claim
      size: 100Gi
      class: premium-rwo-retain

Com esta configuração, os PersistentVolumes aprovisionados através da classe de armazenamento não são eliminados, mesmo quando o PersistentVolumeClaim correspondente é eliminado.

Para recuperar a instância do Kafka num novo cluster do Kubernetes com a configuração existente e os dados da instância do agente:

  1. Aplique os recursos personalizados do Strimzi Kafka existentes (Kakfa, KafkaTopic, KafkaUser, etc.) a um novo cluster do Kubernetes
  2. Atualize os PersistentVolumeClaims com o nome das novas instâncias do agente Kafka para os PersistentVolumes antigos através da propriedade spec.volumeName no PersistentVolumeClaim.