Aggiorna un gruppo di consumatori di Google Cloud Managed Service per Apache Kafka

Puoi aggiornare un gruppo di consumer Google Cloud Managed Service per Apache Kafka per modificare gli offset per un elenco di partizioni di argomenti. In questo modo puoi controllare quali messaggi ricevono i consumatori del gruppo.

Per aggiornare un gruppo di consumatori, puoi utilizzare Google Cloud CLI, la libreria client, l'API Managed Kafka o le API Apache Kafka open source. La console Google Cloud non è supportata per la modifica di un gruppo di consumatori.

Prima di iniziare

Per aggiornare un gruppo di consumatori, assicurati prima che non stia consumando attivamente messaggi. Un gruppo di consumatori viene eliminato automaticamente da Kafka se non ha mai consumato messaggi o quando l'ultimo offset di commit è scaduto dopo offsets.retention.minutes.

Segui questi passaggi prima di aggiornare un gruppo di consumatori:

  1. Invia alcuni messaggi all'argomento da cui il tuo gruppo di consumatori legge i messaggi.

  2. Avvia il gruppo di consumatori per elaborare alcuni messaggi.

  3. Impedisci a tutti i tuoi consumer di utilizzare i messaggi. Per interrompere un consumer, premi Ctrl+C.

Per saperne di più sull'invio e sull'utilizzo dei messaggi, vedi Produci e utilizza messaggi con gli strumenti a riga di comando Kafka.

Ruoli e autorizzazioni richiesti per aggiornare un gruppo di consumatori

Per ottenere le autorizzazioni necessarie per modificare i gruppi di consumatori, chiedi all'amministratore di concederti il ruolo IAM Editor gruppo di consumatori Kafka gestito (roles/managedkafka.consumerGroupEditor) nel progetto. Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Questo ruolo predefinito contiene le autorizzazioni necessarie per modificare i gruppi di consumatori. Per vedere quali sono esattamente le autorizzazioni richieste, espandi la sezione Autorizzazioni obbligatorie:

Autorizzazioni obbligatorie

Per modificare i gruppi di consumatori sono necessarie le seguenti autorizzazioni:

  • Aggiorna i gruppi di consumatori: managedkafka.consumerGroups.update

Potresti anche ottenere queste autorizzazioni con ruoli personalizzati o altri ruoli predefiniti.

Per maggiori informazioni sul ruolo Editor gruppo di consumer Kafka gestito, consulta Ruoli predefiniti di Managed Service per Apache Kafka.

Concedi l'accesso READ al service agent

Per aggiornare gli offset del gruppo di consumer, l'agente di servizio richiede l'accesso all'operazione READ sulle risorse di argomento e gruppo di consumer. Questo accesso è configurato con ACL Apache Kafka.

Se non hai configurato elenchi ACL Apache Kafka per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio ha accesso ambientale a queste risorse. Puoi saltare questa sezione.

Se gli elenchi ACL Apache Kafka sono configurati per il gruppo di consumer e il relativo argomento all'interno del cluster, l'agente di servizio richiede l'accesso ACL esplicito per l'operazione READ per entrambe le risorse. Per farlo, aggiungi voci ACL che concedano all'agente di servizio l'accesso all'operazione READ sul gruppo di consumatori e sull'argomento pertinenti. Segui questi passaggi:

  1. Installa Google Cloud CLI.

  2. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  3. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  4. Esegui il comando gcloud managed-kafka acls add-acl-entry:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Sostituisci quanto segue:

    • CONSUMER_GROUP_ACL_ID (obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per il gruppo di consumatori. Per applicare l'accesso a tutti i gruppi di consumer, utilizza `allConsumerGroups`. In alternativa, per un gruppo di consumer specifico, utilizza `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (obbligatorio): l'ID univoco della risorsa ACL Managed Service per Apache Kafka in cui vuoi aggiungere la voce ACL per l'argomento. Per applicare l'accesso a tutti gli argomenti, utilizza `allTopics`. In alternativa, per un argomento specifico, utilizza `topic/TOPIC_NAME`.
    • CLUSTER_ID (obbligatorio): l'ID del cluster che contiene la risorsa ACL.
    • LOCATION (obbligatorio): la regione in cui si trova il cluster. Consulta le località supportate.
    • PROJECT_NUMBER (obbligatorio): il numero di progetto del progetto in cui si trova il cluster. Viene utilizzato per creare il nome dell'entità dell'agente di servizio per la voce ACL.

Per ulteriori informazioni sull'aggiunta di una voce ACL, vedi Aggiungere una voce ACL.

Aggiorna un gruppo di consumatori

Assicurati di aver completato i passaggi descritti nella sezione Prima di iniziare.

Per aggiornare un gruppo di consumatori:

gcloud

  1. Nella console Google Cloud , attiva Cloud Shell.

    Attiva Cloud Shell

    Nella parte inferiore della console Google Cloud viene avviata una sessione di Cloud Shell e viene visualizzato un prompt della riga di comando. Cloud Shell è un ambiente shell con Google Cloud CLI già installata e con valori già impostati per il progetto corrente. L'inizializzazione della sessione può richiedere alcuni secondi.

  2. Esegui il comando gcloud managed-kafka consumer-groups update:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Sostituisci quanto segue:

    • CLUSTER_ID: l'ID o il nome del cluster.

    • LOCATION: la posizione del cluster.

    • CONSUMER_GROUP_ID: l'ID o il nome del gruppo di consumatori.

    • TOPICS_FILE: Questa impostazione specifica la posizione del file contenente la configurazione degli argomenti da aggiornare per il gruppo di consumatori. Il file può essere in formato JSON o YAML. Può essere un percorso file o includere direttamente i contenuti JSON o YAML.

      Il file degli argomenti utilizza una struttura JSON per rappresentare una mappa degli argomenti ConsumerGroup, nel formato { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. Per ogni argomento, ConsumerPartitionMetadata fornisce l'offset e i metadati per ogni partizione.

      Per impostare l'offset per una singola partizione (partizione 0) in un argomento denominato topic1 su 10, la configurazione JSON sarà simile alla seguente:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Di seguito è riportato un esempio dei contenuti di un file topics.json:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: quando specifichi gli argomenti nel file JSON o YAML, includi il percorso completo dell'argomento, che può essere ottenuto eseguendo il comando gcloud managed-kafak topics describe e nel formato projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

Passaggi successivi

Apache Kafka® è un marchio registrato di Apache Software Foundation o delle sue affiliate negli Stati Uniti e/o in altri paesi.