Puoi aggiornare un gruppo di consumer Google Cloud Managed Service per Apache Kafka per modificare gli offset per un elenco di partizioni di argomenti. In questo modo puoi controllare quali messaggi ricevono i consumatori del gruppo.
Per aggiornare un gruppo di consumatori, puoi utilizzare Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source. La console Google Cloud non è supportata per la modifica di un gruppo di consumatori.
Prima di iniziare
Per aggiornare un gruppo di consumatori, assicurati prima che non stia consumando attivamente messaggi.
Un gruppo di consumatori viene eliminato automaticamente da Kafka se non ha mai consumato
messaggi o quando l'ultimo offset di commit è scaduto dopo
offsets.retention.minutes.
Segui questi passaggi prima di aggiornare un gruppo di consumatori:
Invia alcuni messaggi all'argomento da cui il tuo gruppo di consumatori legge i messaggi.
Avvia il gruppo di consumatori per elaborare alcuni messaggi.
Impedisci a tutti i tuoi consumer di utilizzare i messaggi. Per interrompere un consumer, premi Ctrl+C.
Per saperne di più sull'invio e sull'utilizzo dei messaggi, vedi Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.
Ruoli e autorizzazioni richiesti per aggiornare un gruppo di consumatori
Per ottenere le autorizzazioni necessarie per modificare i gruppi di consumatori, chiedi all'amministratore di concederti il ruolo IAM Editor gruppo di consumatori Kafka gestito (roles/managedkafka.consumerGroupEditor) nel progetto.
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare i gruppi di consumatori. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:
Autorizzazioni obbligatorie
Per modificare i gruppi di consumatori sono necessarie le seguenti autorizzazioni:
-
Aggiorna i gruppi di consumatori:
managedkafka.consumerGroups.update
Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.
Per maggiori informazioni sul ruolo Editor gruppo di consumer Kafka gestito, consulta Ruoli predefiniti di Managed Service per Apache Kafka.
Concedi l'accesso READ al service agent
Per aggiornare gli offset del gruppo di consumer, l'agente di servizio richiede l'accesso all'operazione READ sulle risorse di argomento e gruppo di consumer. Questo accesso è configurato con ACL Apache Kafka.
Se non hai configurato elenchi ACL Apache Kafka per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio ha accesso ambientale a queste risorse. Puoi saltare questa sezione.
Se gli elenchi ACL Apache Kafka sono configurati per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio richiede l'accesso ACL esplicito per l'operazione READ per entrambe le risorse. Per farlo, aggiungi voci ACL che concedano all'agente di servizio l'accesso all'operazione READ sul gruppo di consumatori e sull'argomento pertinenti. Segui questi passaggi:
-
Installa Google Cloud CLI.
-
Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.
-
Per inizializzare gcloud CLI, esegui questo comando:
gcloud init Esegui il comando
gcloud managed-kafka acls add-acl-entry:gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=* gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \ --operation=READ --permission-type=ALLOW --host=*
Sostituisci quanto segue:
CONSUMER_GROUP_ACL_ID(obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per il gruppo di consumatori. Per applicare l'accesso a tutti i gruppi di consumer, utilizza `allConsumerGroups`. In alternativa, per un gruppo di consumer specifico, utilizza `consumerGroup/CONSUMER_GROUP_NAME`.TOPIC_ACL_ID(obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per l'argomento. Per applicare l'accesso a tutti gli argomenti, utilizza `allTopics`. In alternativa, per un argomento specifico, utilizza `topic/TOPIC_NAME`.CLUSTER_ID(obbligatorio): l'ID del cluster che contiene la risorsa ACL.LOCATION(obbligatorio): la regione in cui si trova il cluster. Consulta le località supportate.PROJECT_NUMBER(obbligatorio): il numero di progetto del progetto in cui si trova il cluster. Viene utilizzato per creare il nome dell'entità dell'agente di servizio per la voce ACL.
Per ulteriori informazioni sull'aggiunta di una voce ACL, vedi Aggiungere una voce ACL.
Aggiorna un gruppo di consumatori
Assicurati di aver completato i passaggi descritti nella sezione Prima di iniziare.
Per aggiornare un gruppo di consumatori:
gcloud
-
Nella console Google Cloud , attiva Cloud Shell.
Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.
Esegui il comando
gcloud managed-kafka consumer-groups update:gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \ --cluster=CLUSTER_ID \ --location=LOCATION \ --topics-file=TOPICS_FILE
Sostituisci quanto segue:
-
CLUSTER_ID: l'ID o il nome del cluster.
-
LOCATION: la posizione del cluster.
-
CONSUMER_GROUP_ID: l'ID o il nome del gruppo di consumatori.
-
TOPICS_FILE: Questa impostazione specifica la posizione del file contenente la configurazione degli argomenti da aggiornare per il gruppo di consumatori. Il file può essere in formato JSON o YAML. Può essere un percorso file o includere direttamente i contenuti JSON o YAML.
Il file degli argomenti utilizza una struttura JSON per rappresentare una mappa degli argomenti
ConsumerGroup, nel formato{ topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Per ogni argomento,ConsumerPartitionMetadatafornisce l'offset e i metadati per ogni partizione.Per impostare l'offset per una singola partizione (partizione 0) in un argomento denominato
topic1su 10, la configurazione JSON sarà simile alla seguente:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}Di seguito è riportato un esempio dei contenuti di un file
topics.json:{ "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" }, "2": { "offset": "1", "metadata": "metadata" } } }, "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": { "partitions": { "1": { "offset": "1", "metadata": "metadata" } } } }
-
TOPIC_PATH: quando specifichi gli argomenti nel file JSON o YAML, includi il percorso completo dell'argomento, che può essere ottenuto eseguendo il comando
gcloud managed-kafak topics describee nel formatoprojects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .
-