Atualizar um grupo de consumidores do Google Cloud Managed Service para Apache Kafka

É possível atualizar um grupo de consumidores do Serviço Gerenciado do Google Cloud para Apache Kafka para modificar os deslocamentos de uma lista de partições de tópicos. Isso permite controlar quais mensagens os consumidores do grupo recebem.

Para atualizar um grupo de consumidores, use a Google Cloud CLI, a biblioteca de cliente, a API Managed Kafka ou as APIs de código aberto do Apache Kafka. O Google Cloud console não é compatível com a edição de um grupo de consumidores.

Antes de começar

Para atualizar um grupo de consumidores, primeiro verifique se ele não está consumindo mensagens ativamente. Um grupo de consumidores é excluído automaticamente pelo Kafka se nunca consumiu mensagens ou quando o último deslocamento confirmado expira após offsets.retention.minutes.

Siga estas etapas antes de atualizar um grupo de consumidores:

  1. Envie algumas mensagens para o tópico em que o grupo de consumidores está lendo mensagens.

  2. Inicie o grupo de consumidores para processar algumas mensagens.

  3. Impeça que todos os consumidores consumam mensagens. Para interromper um consumidor, pressione Control+C.

Para mais informações sobre como enviar e consumir mensagens, consulte Produzir e consumir mensagens com as ferramentas de linha de comando do Kafka.

Papéis e permissões necessários para atualizar um grupo de consumidores

Para receber as permissões necessárias para editar seus grupos de consumidores, peça ao administrador para conceder a você o papel do IAM de Editor de grupos de consumidores do Managed Kafka (roles/managedkafka.consumerGroupEditor) no seu projeto. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Esse papel predefinido contém as permissões necessárias para editar seus grupos de consumidores. Para acessar as permissões exatas que são necessárias, expanda a seção Permissões necessárias:

Permissões necessárias

As seguintes permissões são necessárias para editar seus grupos de consumidores:

  • Atualizar grupos de consumidores: managedkafka.consumerGroups.update

Essas permissões também podem ser concedidas com funções personalizadas ou outros papéis predefinidos.

Para mais informações sobre o papel de Editor de grupos de consumidores do Managed Kafka, consulte Papéis predefinidos do Serviço Gerenciado para Apache Kafka.

Conceder acesso de LEITURA ao agente de serviço

Para atualizar os deslocamentos do grupo de consumidores, o agente de serviço precisa de acesso à operação de LEITURA nos recursos de tópico e grupo de consumidores. Esse acesso é configurado com ACLs do Apache Kafka.

Se você não tiver configurado nenhuma ACL do Apache Kafka para o grupo de consumidores e o tópico no cluster, o agente de serviço terá acesso ambiente a esses recursos. Você pode pular esta seção.

Se as ACLs do Apache Kafka estiverem configuradas para o grupo de consumidores e o tópico no cluster, o agente de serviço vai precisar de acesso explícito à ACL para a operação de LEITURA dos dois recursos. Para fazer isso, adicione entradas de ACL que concedam ao agente de serviço acesso à operação de LEITURA no grupo de consumidores e no tópico relevantes. Siga estas etapas:

  1. Instale a Google Cloud CLI.

  2. Ao usar um provedor de identidade (IdP) externo, primeiro faça login na gcloud CLI com sua identidade federada.

  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Execute o comando gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Substitua:

    • CONSUMER_GROUP_ACL_ID (obrigatório): o ID exclusivo do recurso de ACL do Serviço Gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL para o grupo de consumidores. Para aplicar o acesso a todos os grupos de consumidores, use `allConsumerGroups`. Para um grupo de consumidores específico, use `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (obrigatório): o ID exclusivo do recurso de ACL do Serviço Gerenciado para Apache Kafka em que você quer adicionar a entrada de ACL para o tópico. Para aplicar o acesso a todos os tópicos, use `allTopics`. Para um tópico específico, use `topic/TOPIC_NAME`.
    • CLUSTER_ID (obrigatório): o ID do cluster que contém o recurso de ACL.
    • LOCATION (obrigatório): a região em que o cluster está localizado. Consulte locais compatíveis.
    • PROJECT_NUMBER (obrigatório): o número do projeto de o projeto em que o cluster está localizado. Ele é usado para criar o nome principal do agente de serviço para a entrada de ACL.

Para mais informações sobre como adicionar uma entrada de ACL, consulte Adicionar uma entrada de ACL.

Atualizar um grupo de consumidores

Verifique se você concluiu as etapas na seção Antes de começar.

Para atualizar um grupo de consumidores, siga estas etapas:

gcloud

  1. No Google Cloud console do Cloud, ative o Cloud Shell.

    Ativar o Cloud Shell

    Na parte de baixo do Google Cloud console do Cloud, uma sessão do Cloud Shell é iniciada e exibe um prompt de linha de comando. O Cloud Shell é um ambiente shell com a Google Cloud CLI já instalada e com valores já definidos para o projeto atual. A inicialização da sessão pode levar alguns segundos.

  2. Execute o comando gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Substitua:

    • CLUSTER_ID: o ID ou nome do cluster.

    • LOCATION: o local do cluster.

    • CONSUMER_GROUP_ID: o ID ou nome do grupo de consumidores.

    • TOPICS_FILE: essa configuração especifica o local do arquivo que contém a configuração dos tópicos a serem atualizados para o grupo de consumidores. O arquivo pode estar no formato JSON ou YAML. Ele pode ser um caminho de arquivo ou incluir diretamente o conteúdo JSON ou YAML.

      O arquivo de tópico usa uma estrutura JSON para representar um mapa de tópicos ConsumerGroup, no formato { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Para cada tópico, ConsumerPartitionMetadata fornece o deslocamento e os metadados de cada partição.

      Para definir o deslocamento de uma única partição (partição 0) em um tópico chamado topic1 como 10, a configuração JSON seria:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Confira a seguir um exemplo do conteúdo de um arquivo topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: ao especificar tópicos em um arquivo JSON ou YAML, inclua o caminho completo do tópico, que pode ser obtido executando o comando gcloud managed-kafak topics describe e do formato projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

A seguir

Apache Kafka® é uma marca registrada da The Apache Software Foundation ou afiliadas nos Estados Unidos e/ou em outros países.