É possível atualizar um grupo de consumidores do Google Cloud Managed Service para Apache Kafka e modificar os offsets de uma lista de partições de tópico. Isso permite controlar quais mensagens os consumidores do grupo recebem.
Para atualizar um grupo de consumidores, use a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs Apache Kafka de código aberto. O console Google Cloud não é compatível com a edição de um grupo de consumidores.
Antes de começar
Para atualizar um grupo de consumidores, primeiro verifique se ele não está consumindo mensagens ativamente.
Um grupo de consumidores é excluído automaticamente pelo Kafka se nunca tiver consumido mensagens ou quando o último deslocamento confirmado expirar após offsets.retention.minutes.
Siga estas etapas antes de atualizar um grupo de consumidores:
Envie algumas mensagens para o tópico de que seu grupo de consumidores está lendo mensagens.
Inicie seu grupo de consumidores para processar algumas mensagens.
Impeça que todos os seus consumidores consumam mensagens. Para interromper um consumidor, pressione Control+C.
Para mais informações sobre como enviar e consumir mensagens, consulte Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.
Papéis e permissões necessários para atualizar um grupo de consumidores
Para receber as permissões necessárias
para editar seus grupos de consumidores,
peça ao administrador para conceder a você o
papel do IAM de Editor de grupos de consumidores gerenciados do Kafka (roles/managedkafka.consumerGroupEditor)
no projeto.
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Esse papel predefinido contém as permissões necessárias para editar seus grupos de consumidores. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:
Permissões necessárias
As seguintes permissões são necessárias para editar seus grupos de consumidores:
-
Atualize os grupos de consumidores:
managedkafka.consumerGroups.update
Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.
Para mais informações sobre o papel de editor do grupo de consumidores gerenciados do Kafka, consulte Papéis predefinidos do Managed Service para Apache Kafka.
Conceder acesso de LEITURA ao agente de serviço
Para atualizar os offsets do grupo de consumidores, o agente de serviço precisa de acesso à operação READ nos recursos de tópico e grupo de consumidores. Esse acesso é configurado com ACLs do Apache Kafka.
Se você não tiver configurado ACLs do Apache Kafka para o grupo de consumidores e o tópico dele no cluster, o agente de serviço terá acesso ambiente a esses recursos. Você pode pular esta seção.
Se as ACLs do Apache Kafka estiverem configuradas para o grupo de consumidores e o tópico dele no cluster, o agente de serviço vai exigir acesso explícito à ACL para a operação READ dos dois recursos. Para isso, adicione entradas de ACL que concedam ao agente de serviço acesso à operação READ no grupo de consumidores e no tópico relevantes. Siga estas etapas:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init Execute o comando
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Substitua:
CONSUMER_GROUP_ACL_ID(obrigatório): o ID exclusivo do recurso de ACL do Serviço gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL do grupo de consumidores. Para aplicar o acesso a todos os grupos de consumidores, use "allConsumerGroups". Para um grupo específico, use "consumerGroup/CONSUMER_GROUP_NAME".TOPIC_ACL_ID(obrigatório): o ID exclusivo do recurso de ACL do Serviço gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL do tópico. Para aplicar o acesso a todos os tópicos, use "allTopics". Para um tópico específico, use "topic/TOPIC_NAME".CLUSTER_ID(obrigatório): o ID do cluster que contém o recurso de ACL.LOCATION(obrigatório): a região em que o cluster está localizado. Consulte Locais compatíveis.PROJECT_NUMBER(obrigatório): o número do projeto em que o cluster está localizado. Isso é usado para criar o nome principal do agente de serviço para a entrada da ACL.
Para mais informações sobre como adicionar uma entrada de ACL, consulte Adicionar uma entrada de ACL.
Atualizar um grupo de consumidores
Verifique se você concluiu as etapas na seção Antes de começar.
Para atualizar um grupo de consumidores, siga estas etapas:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Execute o comando
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Substitua:
-
CLUSTER_ID: o ID ou nome do cluster.
-
LOCATION: o local do cluster.
-
CONSUMER_GROUP_ID: o ID ou nome do grupo de consumidores.
-
TOPICS_FILE: essa configuração especifica o local do arquivo que contém a configuração dos tópicos a serem atualizados para o grupo de consumidores. O arquivo pode estar no formato JSON ou YAML. Pode ser um caminho de arquivo ou incluir diretamente o conteúdo JSON ou YAML.
O arquivo de tema usa uma estrutura JSON para representar um mapa de temas
ConsumerGroup, na forma{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tópico,ConsumerPartitionMetadatafornece o deslocamento e os metadados de cada partição.Para definir o deslocamento de uma única partição (partição 0) em um tópico chamado
topic1como 10, a configuração JSON seria assim:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}Confira abaixo um exemplo do conteúdo de um arquivo
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: ao especificar tópicos em um arquivo JSON ou YAML, inclua o caminho completo do tópico, que pode ser obtido executando o comando
gcloud managed-kafak topics describee tem o formatoprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-