Google Cloud Managed Service for Apache Kafka-Consumergruppe aktualisieren

Sie können eine Google Cloud Managed Service for Apache Kafka-Nutzergruppe aktualisieren, um die Offsets für eine Liste von Themenpartitionen zu ändern. So können Sie steuern, welche Nachrichten die Nutzer in der Gruppe erhalten.

Sie können eine Consumer-Gruppe mit der Google Cloud CLI, der Clientbibliothek, der Managed Kafka API oder den Open-Source-Apache Kafka APIs aktualisieren. Die Google Cloud Console wird nicht zum Bearbeiten einer Verbrauchergruppe unterstützt.

Hinweise

Wenn Sie eine Consumer-Gruppe aktualisieren möchten, muss sie zuerst inaktiv sein. Eine Consumer-Gruppe wird von Kafka automatisch gelöscht, wenn sie noch nie Nachrichten verarbeitet hat oder wenn der letzte committete Offset nach offsets.retention.minutes abgelaufen ist.

Führen Sie diese Schritte aus, bevor Sie eine Verbrauchergruppe aktualisieren:

  1. Senden Sie einige Nachrichten an das Thema, aus dem Ihre Nutzergruppe Nachrichten liest.

  2. Starten Sie die Verbrauchergruppe, um einige Nachrichten zu verarbeiten.

  3. Verhindern Sie, dass alle Ihre Consumer Nachrichten verarbeiten. Drücken Sie Strg+C, um einen Consumer zu beenden.

Weitere Informationen zum Senden und Empfangen von Nachrichten finden Sie unter Nachrichten mit den Kafka-Befehlszeilentools senden und empfangen.

Erforderliche Rollen und Berechtigungen zum Aktualisieren einer Verbrauchergruppe

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten Ihrer Consumer-Gruppen benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten Ihrer Verbrauchergruppen erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten Ihrer Verbrauchergruppen erforderlich:

  • Verbrauchergruppen aktualisieren: managedkafka.consumerGroups.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle „Managed Kafka Consumer Group Editor“ finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Dienst-Agent Lesezugriff gewähren

Um Consumer-Gruppen-Offsets zu aktualisieren, benötigt der Dienst-Agent Zugriff auf den READ-Vorgang für die Topic- und Consumer-Gruppen-Ressourcen. Dieser Zugriff wird mit Apache Kafka-ACLs konfiguriert.

Wenn Sie keine Apache Kafka-ACLs für die Consumer-Gruppe und das zugehörige Thema im Cluster konfiguriert haben, hat der Dienst-Agent Zugriff auf diese Ressourcen. Sie können diesen Abschnitt überspringen.

Wenn Apache Kafka-ACLs für die Consumer-Gruppe und das zugehörige Thema im Cluster konfiguriert sind, benötigt der Dienst-Agent expliziten ACL-Zugriff für den READ-Vorgang für beide Ressourcen. Fügen Sie dazu ACL-Einträge hinzu, die dem Dienst-Agent Zugriff auf den READ-Vorgang für die entsprechende Verbrauchergruppe und das entsprechende Thema gewähren. Gehen Sie so vor:

  1. Install the Google Cloud CLI.

  2. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Führen Sie den Befehl gcloud managed-kafka acls add-acl-entry aus:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Ersetzen Sie Folgendes:

    • CONSUMER_GROUP_ACL_ID (erforderlich): Die eindeutige ID der ACL-Ressource von Managed Service for Apache Kafka, der Sie den ACL-Eintrag für die Consumer-Gruppe hinzufügen möchten. Wenn Sie den Zugriff auf alle Nutzergruppen anwenden möchten, verwenden Sie `allConsumerGroups`. Für eine bestimmte Nutzergruppe verwenden Sie `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (erforderlich): Die eindeutige ID der Managed Service for Apache Kafka-ACL-Ressource, der Sie den ACL-Eintrag für das Thema hinzufügen möchten. Wenn Sie den Zugriff auf alle Themen anwenden möchten, verwenden Sie `allTopics`. Für ein bestimmtes Thema verwenden Sie `topic/TOPIC_NAME`.
    • CLUSTER_ID (erforderlich): Die ID des Clusters, der die ACL-Ressource enthält.
    • LOCATION (erforderlich): Die Region, in der sich der Cluster befindet. Unterstützte Standorte
    • PROJECT_NUMBER (erforderlich): Die Projektnummer des Projekts, in dem sich der Cluster befindet. Damit wird der Prinzipalname des Dienst-Agents für den ACL-Eintrag erstellt.

Weitere Informationen zum Hinzufügen eines ACL-Eintrags finden Sie unter ACL-Eintrag hinzufügen.

Verbrauchergruppe aktualisieren

Prüfen Sie, ob Sie die Schritte im Abschnitt Vorbereitung ausgeführt haben.

So aktualisieren Sie eine Verbrauchergruppe:

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Führen Sie den Befehl gcloud managed-kafka consumer-groups update aus:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters.

    • LOCATION: Der Standort des Clusters.

    • CONSUMER_GROUP_ID: Die ID oder der Name der Nutzergruppe.

    • TOPICS_FILE: Mit dieser Einstellung wird der Speicherort der Datei angegeben, die die Konfiguration der Themen enthält, die für die Verbrauchergruppe aktualisiert werden sollen. Die Datei kann im JSON- oder YAML-Format vorliegen. Es kann sich um einen Dateipfad handeln oder der JSON- oder YAML-Inhalt kann direkt angegeben werden.

      In der Themen-Datei wird eine JSON-Struktur verwendet, um eine ConsumerGroup-Themenübersicht im Format { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}} darzustellen. Für jedes Thema enthält ConsumerPartitionMetadata den Offset und die Metadaten für jede Partition.

      Wenn Sie den Offset für eine einzelne Partition (Partition 0) in einem Thema mit dem Namen topic1 auf 10 festlegen möchten, sieht die JSON-Konfiguration so aus:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Im Folgenden finden Sie ein Beispiel für den Inhalt einer topics.json-Datei:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: Wenn Sie Themen in einer JSON- oder YAML-Datei angeben, geben Sie den vollständigen Themenpfad an, den Sie durch Ausführen des Befehls gcloud managed-kafak topics describe erhalten können. Der Pfad hat das Format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

  3. Go

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// consumerGroupID := "my-consumer-group"
    	// topicPath := "my-topic-path"
    	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)
    
    	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
    	for partition, offset := range partitionOffsets {
    		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
    			Offset: offset,
    		}
    	}
    	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
    		topicPath: {
    			Partitions: partitionMetadata,
    		},
    	}
    	consumerGroupConfig := managedkafkapb.ConsumerGroup{
    		Name:   consumerGroupPath,
    		Topics: topicConfig,
    	}
    	paths := []string{"topics"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateConsumerGroupRequest{
    		UpdateMask:    updateMask,
    		ConsumerGroup: &consumerGroupConfig,
    	}
    	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
    	return nil
    }
    

    Java

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConsumerGroup;
    import com.google.cloud.managedkafka.v1.ConsumerGroupName;
    import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
    import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateConsumerGroup {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        String consumerGroupId = "my-consumer-group";
        Map<Integer, Integer> partitionOffsets =
            new HashMap<Integer, Integer>() {
              {
                put(1, 10);
                put(2, 20);
                put(3, 30);
              }
            };
        updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
      }
    
      public static void updateConsumerGroup(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          String consumerGroupId,
          Map<Integer, Integer> partitionOffsets)
          throws Exception {
        TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
        ConsumerGroupName consumerGroupName =
            ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);
    
        Map<Integer, ConsumerPartitionMetadata> partitions =
            new HashMap<Integer, ConsumerPartitionMetadata>() {
              {
                for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
                  ConsumerPartitionMetadata partitionMetadata =
                      ConsumerPartitionMetadata.newBuilder()
                          .setOffset(partitionOffset.getValue())
                          .build();
                  put(partitionOffset.getKey(), partitionMetadata);
                }
              }
            };
        ConsumerTopicMetadata topicMetadata =
            ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
        ConsumerGroup consumerGroup =
            ConsumerGroup.newBuilder()
                .setName(consumerGroupName.toString())
                .putTopics(topicName.toString(), topicMetadata)
                .build();
        FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();
    
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateConsumerGroupRequest request =
              UpdateConsumerGroupRequest.newBuilder()
                  .setUpdateMask(updateMask)
                  .setConsumerGroup(consumerGroup)
                  .build();
          // This operation is being handled synchronously.
          ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
          System.out.printf("Updated consumer group: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # consumer_group_id = "my-consumer-group"
    # topic_path = "my-topic-path"
    # partition_offsets = {10: 10}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    consumer_group = managedkafka_v1.ConsumerGroup()
    consumer_group.name = client.consumer_group_path(
        project_id, region, cluster_id, consumer_group_id
    )
    
    topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
    for partition, offset in partition_offsets.items():
        partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
        topic_metadata.partitions[partition] = partition_metadata
    consumer_group.topics = {
        topic_path: topic_metadata,
    }
    
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.append("topics")
    
    request = managedkafka_v1.UpdateConsumerGroupRequest(
        update_mask=update_mask,
        consumer_group=consumer_group,
    )
    
    try:
        response = client.update_consumer_group(request=request)
        print("Updated consumer group:", response)
    except NotFound as e:
        print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")
    

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder deren Tochtergesellschaften in den USA und/oder anderen Ländern.