É possível atualizar um grupo de consumidores do Serviço Gerenciado do Google Cloud para Apache Kafka para modificar os deslocamentos de uma lista de partições de tópicos. Isso permite controlar quais mensagens os consumidores do grupo recebem.
Para atualizar um grupo de consumidores, use a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs de código aberto do Apache Kafka. O Google Cloud console não é compatível com a edição de um grupo de consumidores.
Antes de começar
Para atualizar um grupo de consumidores, primeiro verifique se ele não está consumindo mensagens ativamente.
Um grupo de consumidores é excluído automaticamente pelo Kafka se nunca consumiu mensagens ou quando o último deslocamento confirmado expira após offsets.retention.minutes.
Siga estas etapas antes de atualizar um grupo de consumidores:
Envie algumas mensagens para o tópico em que o grupo de consumidores está lendo mensagens.
Inicie o grupo de consumidores para processar algumas mensagens.
Impeça que todos os consumidores consumam mensagens. Para interromper um consumidor, pressione Control+C.
Para mais informações sobre como enviar e consumir mensagens, consulte Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.
Papéis e permissões necessários para atualizar um grupo de consumidores
Para receber as permissões necessárias para editar seus grupos de consumidores, peça ao administrador para conceder a você o papel do IAM de Editor de grupos de consumidores do Managed Kafka (roles/managedkafka.consumerGroupEditor) no seu projeto.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para editar seus grupos de consumidores. Para acessar as permissões exatas que são necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para editar seus grupos de consumidores:
-
Atualizar grupos de consumidores:
managedkafka.consumerGroups.update
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre o papel de Editor de grupos de consumidores do Managed Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.
Conceder acesso de LEITURA ao agente de serviço
Para atualizar os deslocamentos do grupo de consumidores, o agente de serviço precisa de acesso à operação de LEITURA nos recursos de tópico e grupo de consumidores. Esse acesso é configurado com ACLs do Apache Kafka.
Se você não tiver configurado nenhuma ACL do Apache Kafka para o grupo de consumidores e o tópico no cluster, o agente de serviço terá acesso ambiente a esses recursos. Você pode pular esta seção.
Se as ACLs do Apache Kafka estiverem configuradas para o grupo de consumidores e o tópico no cluster, o agente de serviço vai precisar de acesso explícito à ACL para a operação de LEITURA dos dois recursos. Para fazer isso, adicione entradas de ACL que concedam ao agente de serviço acesso à operação de LEITURA no grupo de consumidores e no tópico relevantes. Siga estas etapas:
-
Instale a Google Cloud CLI.
-
Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.
-
Para inicializar a CLI gcloud, execute o seguinte comando:
gcloud init Execute o comando
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Substitua:
CONSUMER_GROUP_ACL_ID(obrigatório): o ID exclusivo do recurso de ACL do Serviço Gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL para o grupo de consumidores. Para aplicar o acesso a todos os grupos de consumidores, use `allConsumerGroups`. Para um grupo de consumidores específico, use `consumerGroup/CONSUMER_GROUP_NAME`.TOPIC_ACL_ID(obrigatório): o ID exclusivo do recurso de ACL do Serviço Gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL para o tópico. Para aplicar o acesso a todos os tópicos, use `allTopics`. Para um tópico específico, use `topic/TOPIC_NAME`.CLUSTER_ID(obrigatório): o ID do cluster que contém o recurso de ACL.LOCATION(obrigatório): a região em que o cluster está localizado. Consulte locais compatíveis.PROJECT_NUMBER(obrigatório): o número do projeto de o projeto em que o cluster está localizado. Ele é usado para criar o nome principal do agente de serviço para a entrada de ACL.
Para mais informações sobre como adicionar uma entrada de ACL, consulte Adicionar uma entrada de ACL.
Atualizar um grupo de consumidores
Verifique se você concluiu as etapas na seção Antes de começar.
Para atualizar um grupo de consumidores, siga estas etapas:
gcloud
-
No Google Cloud console do Cloud, ative o Cloud Shell.
Na parte de baixo do Google Cloud console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.
Execute o comando
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Substitua:
-
CLUSTER_ID: o ID ou nome do cluster.
-
LOCATION: o local do cluster.
-
CONSUMER_GROUP_ID: o ID ou nome do grupo de consumidores.
-
TOPICS_FILE: essa configuração especifica o local do arquivo que contém a configuração dos tópicos a serem atualizados para o grupo de consumidores. O arquivo pode estar no formato JSON ou YAML. Ele pode ser um caminho de arquivo ou incluir diretamente o conteúdo JSON ou YAML.
O arquivo de tópico usa uma estrutura JSON para representar um mapa de tópicos
ConsumerGroup, no formato{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tópico,ConsumerPartitionMetadatafornece o deslocamento e os metadados de cada partição.Para definir o deslocamento de uma única partição (partição 0) em um tópico chamado
topic1como 10, a configuração JSON seria:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}Confira a seguir um exemplo do conteúdo de um arquivo
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: ao especificar tópicos em um arquivo JSON ou YAML, inclua o caminho completo do tópico, que pode ser obtido executando o comando
gcloud managed-kafak topics describee do formatoprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-