Atualizar um grupo de consumidores do Google Cloud Managed Service para Apache Kafka

É possível atualizar um grupo de consumidores do Google Cloud Managed Service para Apache Kafka e modificar os offsets de uma lista de partições de tópico. Isso permite controlar quais mensagens os consumidores do grupo recebem.

Para atualizar um grupo de consumidores, use a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs Apache Kafka de código aberto. O console Google Cloud não é compatível com a edição de um grupo de consumidores.

Antes de começar

Para atualizar um grupo de consumidores, primeiro verifique se ele não está consumindo mensagens ativamente. Um grupo de consumidores é excluído automaticamente pelo Kafka se nunca tiver consumido mensagens ou quando o último deslocamento confirmado expirar após offsets.retention.minutes.

Siga estas etapas antes de atualizar um grupo de consumidores:

  1. Envie algumas mensagens para o tópico de que seu grupo de consumidores está lendo mensagens.

  2. Inicie seu grupo de consumidores para processar algumas mensagens.

  3. Impeça que todos os seus consumidores consumam mensagens. Para interromper um consumidor, pressione Control+C.

Para mais informações sobre como enviar e consumir mensagens, consulte Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Papéis e permissões necessários para atualizar um grupo de consumidores

Para receber as permissões necessárias para editar seus grupos de consumidores, peça ao administrador para conceder a você o papel do IAM de Editor de grupos de consumidores gerenciados do Kafka (roles/managedkafka.consumerGroupEditor) no projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar seus grupos de consumidores. Para acessar as permissões exatas necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para editar seus grupos de consumidores:

  • Atualize os grupos de consumidores: managedkafka.consumerGroups.update

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre o papel de editor do grupo de consumidores gerenciados do Kafka, consulte Papéis predefinidos do Managed Service para Apache Kafka.

Conceder acesso de LEITURA ao agente de serviço

Para atualizar os offsets do grupo de consumidores, o agente de serviço precisa de acesso à operação READ nos recursos de tópico e grupo de consumidores. Esse acesso é configurado com ACLs do Apache Kafka.

Se você não tiver configurado ACLs do Apache Kafka para o grupo de consumidores e o tópico dele no cluster, o agente de serviço terá acesso ambiente a esses recursos. Você pode pular esta seção.

Se as ACLs do Apache Kafka estiverem configuradas para o grupo de consumidores e o tópico dele no cluster, o agente de serviço vai exigir acesso explícito à ACL para a operação READ dos dois recursos. Para isso, adicione entradas de ACL que concedam ao agente de serviço acesso à operação READ no grupo de consumidores e no tópico relevantes. Siga estas etapas:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Execute o comando gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Substitua:

    • CONSUMER_GROUP_ACL_ID (obrigatório): o ID exclusivo do recurso de ACL do Serviço gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL do grupo de consumidores. Para aplicar o acesso a todos os grupos de consumidores, use "allConsumerGroups". Para um grupo específico, use "consumerGroup/CONSUMER_GROUP_NAME".
    • TOPIC_ACL_ID (obrigatório): o ID exclusivo do recurso de ACL do Serviço gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL do tópico. Para aplicar o acesso a todos os tópicos, use "allTopics". Para um tópico específico, use "topic/TOPIC_NAME".
    • CLUSTER_ID (obrigatório): o ID do cluster que contém o recurso de ACL.
    • LOCATION (obrigatório): a região em que o cluster está localizado. Consulte Locais compatíveis.
    • PROJECT_NUMBER (obrigatório): o número do projeto em que o cluster está localizado. Isso é usado para criar o nome principal do agente de serviço para a entrada da ACL.

Para mais informações sobre como adicionar uma entrada de ACL, consulte Adicionar uma entrada de ACL.

Atualizar um grupo de consumidores

Verifique se você concluiu as etapas na seção Antes de começar.

Para atualizar um grupo de consumidores, siga estas etapas:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Execute o comando gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Substitua:

    • CLUSTER_ID: o ID ou nome do cluster.

    • LOCATION: o local do cluster.

    • CONSUMER_GROUP_ID: o ID ou nome do grupo de consumidores.

    • TOPICS_FILE: essa configuração especifica o local do arquivo que contém a configuração dos tópicos a serem atualizados para o grupo de consumidores. O arquivo pode estar no formato JSON ou YAML. Pode ser um caminho de arquivo ou incluir diretamente o conteúdo JSON ou YAML.

      O arquivo de tema usa uma estrutura JSON para representar um mapa de temas ConsumerGroup, na forma { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tópico, ConsumerPartitionMetadata fornece o deslocamento e os metadados de cada partição.

      Para definir o deslocamento de uma única partição (partição 0) em um tópico chamado topic1 como 10, a configuração JSON seria assim:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Confira abaixo um exemplo do conteúdo de um arquivo topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: ao especificar tópicos em um arquivo JSON ou YAML, inclua o caminho completo do tópico, que pode ser obtido executando o comando gcloud managed-kafak topics describe e tem o formato projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.