Aggiorna un gruppo di consumatori di Google Cloud Managed Service per Apache Kafka

Puoi aggiornare un gruppo di consumatori di Google Cloud Managed Service per Apache Kafka per modificare gli offset per un elenco di partizioni di argomenti. In questo modo puoi controllare quali messaggi ricevono i consumatori del gruppo.

Per aggiornare un gruppo di consumatori, puoi utilizzare Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source. La console Google Cloud non è supportata per la modifica di un gruppo di consumatori.

Prima di iniziare

Per aggiornare un gruppo di consumatori, assicurati prima che non stia consumando attivamente messaggi. Un gruppo di consumatori viene eliminato automaticamente da Kafka se non ha mai consumato messaggi o quando l'ultimo offset di commit è scaduto dopo offsets.retention.minutes.

Segui questi passaggi prima di aggiornare un gruppo di consumatori:

  1. Invia alcuni messaggi all'argomento da cui il tuo gruppo di consumatori legge i messaggi.

  2. Avvia il gruppo di consumatori per elaborare alcuni messaggi.

  3. Impedisci a tutti i tuoi consumer di utilizzare i messaggi. Per interrompere un consumer, premi Ctrl+C.

Per ulteriori informazioni sull'invio e sull'utilizzo dei messaggi, vedi Produzione e utilizzo dei messaggi con gli strumenti a riga di comando Kafka.

Ruoli e autorizzazioni richiesti per aggiornare un gruppo di consumer

Per ottenere le autorizzazioni necessarie per modificare i gruppi di consumatori, chiedi all'amministratore di concederti il ruolo IAM Editor gruppo di consumatori Kafka gestito (roles/managedkafka.consumerGroupEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare i gruppi di consumatori. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per modificare i gruppi di consumatori sono necessarie le seguenti autorizzazioni:

  • Aggiorna i gruppi di consumatori: managedkafka.consumerGroups.update

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per maggiori informazioni sul ruolo Editor gruppo di consumer Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.

Concedi l'accesso READ al service agent

Per aggiornare gli offset del gruppo di consumer, l'agente di servizio richiede l'accesso all'operazione READ sulle risorse di argomento e gruppo di consumer. Questo accesso è configurato con ACL Apache Kafka.

Se non hai configurato elenchi ACL Apache Kafka per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio ha accesso ambientale a queste risorse. Puoi saltare questa sezione.

Se gli elenchi ACL Apache Kafka sono configurati per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio richiede l'accesso esplicito agli elenchi ACL per l'operazione READ per entrambe le risorse. A questo scopo, aggiungi voci ACL che concedano all'agente di servizio l'accesso all'operazione READ sul gruppo di consumatori e sull'argomento pertinenti. Segui questi passaggi:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Esegui il comando gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Sostituisci quanto segue:

    • CONSUMER_GROUP_ACL_ID (obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per il gruppo di consumatori. Per applicare l'accesso a tutti i gruppi di consumer, utilizza `allConsumerGroups`. In alternativa, per un gruppo di consumer specifico, utilizza `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per l'argomento. Per applicare l'accesso a tutti gli argomenti, utilizza `allTopics`. In alternativa, per un argomento specifico, utilizza `topic/TOPIC_NAME`.
    • CLUSTER_ID (obbligatorio): l'ID del cluster contenente la risorsa ACL.
    • LOCATION (obbligatorio): la regione in cui si trova il cluster. Consulta le località supportate.
    • PROJECT_NUMBER (obbligatorio): il numero di progetto del progetto in cui si trova il cluster. Viene utilizzato per creare il nome dell'entità dell'agente di servizio per la voce ACL.

Per saperne di più sull'aggiunta di una voce ACL, consulta Aggiungere una voce ACL.

Aggiorna un gruppo di consumatori

Assicurati di aver completato i passaggi descritti nella sezione Prima di iniziare.

Per aggiornare un gruppo di consumatori:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Esegui il comando gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Sostituisci quanto segue:

    • CLUSTER_ID: l'ID o il nome del cluster.

    • LOCATION: la posizione del cluster.

    • CONSUMER_GROUP_ID: l'ID o il nome del gruppo di consumatori.

    • TOPICS_FILE: questa impostazione specifica la posizione del file contenente la configurazione degli argomenti da aggiornare per il gruppo di consumatori. Il file può essere in formato JSON o YAML. Può essere un percorso file o includere direttamente i contenuti JSON o YAML.

      Il file degli argomenti utilizza una struttura JSON per rappresentare una mappa degli argomenti ConsumerGroup, nel formato { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Per ogni argomento, ConsumerPartitionMetadata fornisce l'offset e i metadati per ogni partizione.

      Per impostare l'offset per una singola partizione (partizione 0) in un argomento denominato topic1 su 10, la configurazione JSON sarà simile alla seguente:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Di seguito è riportato un esempio dei contenuti di un file topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: quando specifichi gli argomenti nel file JSON o YAML, includi il percorso completo dell'argomento, che può essere ottenuto eseguendo il comando gcloud managed-kafak topics describe e nel formato projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.