Mettre à jour un groupe de consommateurs Google Cloud Managed Service pour Apache Kafka

Vous pouvez mettre à jour un groupe de consommateurs Google Cloud Managed Service pour Apache Kafka afin de modifier les décalages pour une liste de partitions de sujets. Cela vous permet de contrôler les messages que reçoivent les consommateurs du groupe.

Pour mettre à jour un groupe de consommateurs, vous pouvez utiliser la Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source. La console Google Cloud n'est pas compatible avec la modification d'un groupe de consommateurs.

Avant de commencer

Pour mettre à jour un groupe de consommateurs, assurez-vous d'abord qu'il ne consomme pas activement de messages. Un groupe de consommateurs est automatiquement supprimé par Kafka s'il n'a jamais consommé de messages ou lorsque le dernier décalage validé a expiré après offsets.retention.minutes.

Avant de mettre à jour un groupe de consommateurs, procédez comme suit :

  1. Envoyez des messages au sujet à partir duquel votre groupe de consommateurs lit les messages.

  2. Démarrez votre groupe de consommateurs pour traiter quelques messages.

  3. Empêchez tous vos consommateurs de consommer des messages. Pour arrêter un consommateur, appuyez sur Ctrl+C.

Pour en savoir plus sur l'envoi et la consommation de messages, consultez Produire et consommer des messages avec les outils de ligne de commande Kafka.

Rôles et autorisations requis pour mettre à jour un groupe de consommateurs

Pour obtenir les autorisations nécessaires pour modifier vos groupes de consommateurs, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de groupes de consommateurs Kafka gérés (roles/managedkafka.consumerGroupEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour modifier vos groupes de consommateurs. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour modifier vos groupes de consommateurs :

  • Mettez à jour les groupes de consommateurs : managedkafka.consumerGroups.update

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle "Éditeur de groupes de consommateurs Kafka gérés", consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Accorder l'accès en lecture à l'agent de service

Pour mettre à jour les décalages du groupe de consommateurs, l'agent de service doit avoir accès à l'opération READ sur les ressources du thème et du groupe de consommateurs. Cet accès est configuré avec des LCA Apache Kafka.

Si vous n'avez configuré aucune LCA Apache Kafka pour le groupe de consommateurs et son sujet dans le cluster, l'agent de service dispose d'un accès ambiant à ces ressources. Vous pouvez ignorer cette section.

Si des LCA Apache Kafka sont configurées pour le groupe de consommateurs et son sujet dans le cluster, l'agent de service nécessite un accès LCA explicite pour l'opération READ pour les deux ressources. Pour ce faire, ajoutez des entrées de LCA accordant à l'agent de service l'accès à l'opération READ sur le groupe de consommateurs et le thème concernés. Procédez comme suit :

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Exécutez la commande gcloud managed-kafka acls add-acl-entry :

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Remplacez les éléments suivants :

    • CONSUMER_GROUP_ACL_ID (obligatoire) : ID unique de la ressource LCA Managed Service pour Apache Kafka à laquelle vous souhaitez ajouter l'entrée LCA pour le groupe de consommateurs. Pour appliquer l'accès à tous les groupes de consommateurs, utilisez `allConsumerGroups`. Pour un groupe de consommateurs spécifique, utilisez `consumerGroup/NOM_GROUPE_CONSOMMATEURS`.
    • TOPIC_ACL_ID (obligatoire) : ID unique de la ressource LCA Managed Service pour Apache Kafka à laquelle vous souhaitez ajouter l'entrée LCA pour le sujet. Pour appliquer l'accès à tous les thèmes, utilisez `allTopics`. Pour un thème spécifique, utilisez `topic/TOPIC_NAME`.
    • CLUSTER_ID (obligatoire) : ID du cluster contenant la ressource ACL.
    • LOCATION (obligatoire) : région dans laquelle se trouve le cluster. Consultez les emplacements compatibles.
    • PROJECT_NUMBER (obligatoire) : numéro du projet dans lequel se trouve le cluster. Il permet de créer le nom principal de l'agent de service pour l'entrée LCA.

Pour savoir comment ajouter une entrée de LCA, consultez Ajouter une entrée de LCA.

Mettre à jour un groupe de consommateurs

Assurez-vous d'avoir effectué les étapes de la section Avant de commencer.

Pour modifier un groupe de consommateurs, procédez comme suit :

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka consumer-groups update :

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Remplacez les éléments suivants :

    • CLUSTER_ID : ID ou nom du cluster.

    • LOCATION : emplacement du cluster.

    • CONSUMER_GROUP_ID : ID ou nom du groupe de consommateurs.

    • TOPICS_FILE : ce paramètre spécifie l'emplacement du fichier contenant la configuration des thèmes à mettre à jour pour le groupe de consommateurs. Le fichier peut être au format JSON ou YAML. Il peut s'agir d'un chemin d'accès à un fichier ou d'un contenu JSON ou YAML directement inclus.

      Le fichier de thème utilise une structure JSON pour représenter une carte des thèmes ConsumerGroup, sous la forme { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Pour chaque sujet, ConsumerPartitionMetadata fournit le décalage et les métadonnées de chaque partition.

      Pour définir le décalage d'une seule partition (partition 0) dans un sujet nommé topic1 sur 10, la configuration JSON se présente comme suit :{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Voici un exemple du contenu d'un fichier topics.json :

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH : lorsque vous spécifiez des thèmes dans un fichier JSON ou YAML, incluez le chemin d'accès complet au thème, que vous pouvez obtenir en exécutant la commande gcloud managed-kafak topics describe et qui est au format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.