Mettre à jour un groupe de consommateurs Google Cloud Managed Service pour Apache Kafka

Vous pouvez mettre à jour un groupe de consommateurs Google Cloud Managed Service pour Apache Kafka afin de modifier les décalages d'une liste de partitions de sujet. Cela vous permet de contrôler les messages que les consommateurs du groupe reçoivent.

Pour mettre à jour un groupe de consommateurs, vous pouvez utiliser la Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source. La Google Cloud console n'est pas compatible avec la modification d'un groupe de consommateurs.

Avant de commencer

Pour mettre à jour un groupe de consommateurs, assurez-vous d'abord qu'il ne consomme pas activement de messages. Un groupe de consommateurs est automatiquement supprimé par Kafka s'il n'a jamais consommé de messages ou lorsque le dernier décalage validé a expiré après offsets.retention.minutes.

Avant de mettre à jour un groupe de consommateurs, procédez comme suit :

  1. Envoyez des messages au sujet à partir duquel votre groupe de consommateurs lit les messages.

  2. Démarrez votre groupe de consommateurs pour traiter quelques messages.

  3. Empêchez tous vos consommateurs de consommer des messages. Pour arrêter un consommateur, appuyez sur Control+C.

Pour en savoir plus sur l'envoi et la consommation de messages, consultez Produire et consommer des messages avec les outils de ligne de commande Kafka.

Rôles et autorisations requis pour mettre à jour un groupe de consommateurs

Pour obtenir les autorisations nécessaires pour modifier vos groupes de consommateurs, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de groupes de consommateurs Managed Kafka (roles/managedkafka.consumerGroupEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour modifier vos groupes de consommateurs. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour modifier vos groupes de consommateurs :

  • Mettre à jour des groupes de consommateurs : managedkafka.consumerGroups.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle "Éditeur de groupes de consommateurs Managed Kafka", consultez Rôles prédéfinis Managed Service pour Apache Kafka.

Accorder l'accès en lecture à l'agent de service

Pour mettre à jour les décalages des groupes de consommateurs, l'agent de service doit avoir accès à l'opération READ sur les ressources de sujet et de groupe de consommateurs. Cet accès est configuré avec les LCA Apache Kafka.

Si vous n'avez configuré aucune LCA Apache Kafka pour le groupe de consommateurs et son sujet dans le cluster, l'agent de service dispose d'un accès ambiant à ces ressources. Vous pouvez ignorer cette section.

Si des LCA Apache Kafka sont configurées pour le groupe de consommateurs et son sujet dans le cluster, l'agent de service nécessite un accès LCA explicite pour l'opération READ pour les deux ressources. Pour ce faire, ajoutez des entrées de LCA accordant à l'agent de service l'accès à l'opération READ sur le groupe de consommateurs et le sujet concernés. Procédez comme suit :

  1. Installez la Google Cloud CLI.

  2. Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

  3. Pour initialiser la gcloud CLI, exécutez la commande suivante :

    gcloud init
  4. Exécutez la gcloud managed-kafka acls add-acl-entry commande :

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Remplacez les éléments suivants :

    • CONSUMER_GROUP_ACL_ID (obligatoire) : ID unique de la ressource LCA Managed Service pour Apache Kafka à laquelle vous souhaitez ajouter l'entrée de LCA pour le groupe de consommateurs. Pour appliquer l'accès à tous les groupes de consommateurs, utilisez `allConsumerGroups`. Pour un groupe de consommateurs spécifique, utilisez `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (obligatoire) : ID unique de la ressource LCA Managed Service pour Apache Kafka à laquelle vous souhaitez ajouter l'entrée de LCA pour le sujet. Pour appliquer l'accès à tous les sujets, utilisez `allTopics`. Pour un sujet spécifique, utilisez `topic/TOPIC_NAME`.
    • CLUSTER_ID (obligatoire) : ID du cluster contenant la ressource LCA.
    • LOCATION (obligatoire) : région dans laquelle se trouve le cluster. Consultez les régions compatibles.
    • PROJECT_NUMBER (obligatoire) : numéro du projet dans lequel se trouve le cluster. Il est utilisé pour créer le nom principal de l'agent de service pour l'entrée de LCA.

Pour en savoir plus sur l'ajout d'une entrée de LCA, consultez Ajouter une entrée de LCA.

Mettre à jour un groupe de consommateurs

Assurez-vous d'avoir suivi les étapes de la section Avant de commencer.

Pour mettre à jour un groupe de consommateurs, procédez comme suit :

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la gcloud managed-kafka consumer-groups update commande :

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Remplacez les éléments suivants :

    • CLUSTER_ID : ID ou nom du cluster.

    • LOCATION : emplacement du cluster.

    • CONSUMER_GROUP_ID : ID ou nom du groupe de consommateurs.

    • TOPICS_FILE : ce paramètre spécifie l'emplacement du fichier contenant la configuration des sujets à mettre à jour pour le groupe de consommateurs. Le fichier peut être au format JSON ou YAML. Il peut s'agir d'un chemin d'accès à un fichier ou inclure directement le contenu JSON ou YAML.

      Le fichier de sujet utilise une structure JSON pour représenter une carte de sujets ConsumerGroup, au format { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Pour chaque sujet, ConsumerPartitionMetadata fournit le décalage et les métadonnées de chaque partition.

      Pour définir le décalage d'une seule partition (partition 0) dans un sujet nommé topic1 sur 10, la configuration JSON se présente comme suit :{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Voici un exemple du contenu d'un fichier topics.json :

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH : lorsque vous spécifiez des sujets dans un fichier JSON ou YAML, incluez le chemin d'accès complet au sujet, que vous pouvez obtenir en exécutant la commande gcloud managed-kafak topics describe et au format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.