Puoi aggiornare un gruppo di consumatori di Google Cloud Managed Service per Apache Kafka per modificare gli offset per un elenco di partizioni di argomenti. In questo modo puoi controllare quali messaggi ricevono i consumatori del gruppo.
Per aggiornare un gruppo di consumatori, puoi utilizzare Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source. La console Google Cloud non è supportata per la modifica di un gruppo di consumatori.
Prima di iniziare
Per aggiornare un gruppo di consumatori, assicurati prima che non stia consumando attivamente messaggi.
Un gruppo di consumatori viene eliminato automaticamente da Kafka se non ha mai consumato
messaggi o quando l'ultimo offset di commit è scaduto dopo
offsets.retention.minutes.
Segui questi passaggi prima di aggiornare un gruppo di consumatori:
Invia alcuni messaggi all'argomento da cui il tuo gruppo di consumatori legge i messaggi.
Avvia il gruppo di consumatori per elaborare alcuni messaggi.
Impedisci a tutti i tuoi consumer di utilizzare i messaggi. Per interrompere un consumer, premi Ctrl+C.
Per ulteriori informazioni sull'invio e sull'utilizzo dei messaggi, vedi Produzione e utilizzo dei messaggi con gli strumenti a riga di comando Kafka.
Ruoli e autorizzazioni richiesti per aggiornare un gruppo di consumer
Per ottenere le autorizzazioni necessarie per modificare i gruppi di consumatori, chiedi all'amministratore di concederti il ruolo IAM Editor gruppo di consumatori Kafka gestito (roles/managedkafka.consumerGroupEditor) nel progetto.
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare i gruppi di consumatori. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per modificare i gruppi di consumatori sono necessarie le seguenti autorizzazioni:
-
Aggiorna i gruppi di consumatori:
managedkafka.consumerGroups.update
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Per maggiori informazioni sul ruolo Editor gruppo di consumer Kafka gestito, vedi Ruoli predefiniti di Managed Service per Apache Kafka.
Concedi l'accesso READ al service agent
Per aggiornare gli offset del gruppo di consumer, l'agente di servizio richiede l'accesso all'operazione READ sulle risorse di argomento e gruppo di consumer. Questo accesso è configurato con ACL Apache Kafka.
Se non hai configurato elenchi ACL Apache Kafka per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio ha accesso ambientale a queste risorse. Puoi saltare questa sezione.
Se gli elenchi ACL Apache Kafka sono configurati per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio richiede l'accesso esplicito agli elenchi ACL per l'operazione READ per entrambe le risorse. A questo scopo, aggiungi voci ACL che concedano all'agente di servizio l'accesso all'operazione READ sul gruppo di consumatori e sull'argomento pertinenti. Segui questi passaggi:
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init Esegui il comando
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Sostituisci quanto segue:
CONSUMER_GROUP_ACL_ID(obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per il gruppo di consumatori. Per applicare l'accesso a tutti i gruppi di consumer, utilizza `allConsumerGroups`. In alternativa, per un gruppo di consumer specifico, utilizza `consumerGroup/CONSUMER_GROUP_NAME`.TOPIC_ACL_ID(obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per l'argomento. Per applicare l'accesso a tutti gli argomenti, utilizza `allTopics`. In alternativa, per un argomento specifico, utilizza `topic/TOPIC_NAME`.CLUSTER_ID(obbligatorio): l'ID del cluster contenente la risorsa ACL.LOCATION(obbligatorio): la regione in cui si trova il cluster. Consulta le località supportate.PROJECT_NUMBER(obbligatorio): il numero di progetto del progetto in cui si trova il cluster. Viene utilizzato per creare il nome dell'entità dell'agente di servizio per la voce ACL.
Per saperne di più sull'aggiunta di una voce ACL, consulta Aggiungere una voce ACL.
Aggiorna un gruppo di consumatori
Assicurati di aver completato i passaggi descritti nella sezione Prima di iniziare.
Per aggiornare un gruppo di consumatori:
gcloud
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Esegui il comando
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Sostituisci quanto segue:
-
CLUSTER_ID: l'ID o il nome del cluster.
-
LOCATION: la posizione del cluster.
-
CONSUMER_GROUP_ID: l'ID o il nome del gruppo di consumatori.
-
TOPICS_FILE: questa impostazione specifica la posizione del file contenente la configurazione degli argomenti da aggiornare per il gruppo di consumatori. Il file può essere in formato JSON o YAML. Può essere un percorso file o includere direttamente i contenuti JSON o YAML.
Il file degli argomenti utilizza una struttura JSON per rappresentare una mappa degli argomenti
ConsumerGroup, nel formato{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Per ogni argomento,ConsumerPartitionMetadatafornisce l'offset e i metadati per ogni partizione.Per impostare l'offset per una singola partizione (partizione 0) in un argomento denominato
topic1su 10, la configurazione JSON sarà simile alla seguente:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}Di seguito è riportato un esempio dei contenuti di un file
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: quando specifichi gli argomenti nel file JSON o YAML, includi il percorso completo dell'argomento, che può essere ottenuto eseguendo il comando
gcloud managed-kafak topics describee nel formatoprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-