Google Cloud Managed Service for Apache Kafka-Consumergruppe aktualisieren

Sie können eine Google Cloud Managed Service for Apache Kafka-Nutzergruppe aktualisieren, um die Offsets für eine Liste von Themenpartitionen zu ändern. So können Sie steuern, welche Nachrichten die Consumer in der Gruppe erhalten.

Zum Aktualisieren einer Nutzergruppe können Sie die Google Cloud CLI, die Clientbibliothek, die Managed Kafka API oder die Open-Source-Apache Kafka APIs verwenden. Die Google Cloud Console wird für die Bearbeitung einer Nutzergruppe nicht unterstützt.

Hinweis

Bevor Sie eine Nutzergruppe aktualisieren, muss sie inaktiv sein. Eine Nutzergruppe wird von Kafka automatisch gelöscht, wenn sie noch nie Nachrichten verarbeitet hat oder wenn das letzte bestätigte Offset nach offsets.retention.minutes abgelaufen ist.

Führen Sie die folgenden Schritte aus, bevor Sie eine Nutzergruppe aktualisieren:

  1. Senden Sie einige Nachrichten an das Thema, aus dem Ihre Nutzergruppe Nachrichten liest.

  2. Starten Sie Ihre Nutzergruppe, um einige Nachrichten zu verarbeiten.

  3. Verhindern Sie, dass alle Ihre Consumer Nachrichten verarbeiten. Drücken Sie Strg+C, um einen Consumer zu beenden.

Weitere Informationen zum Senden und Verarbeiten von Nachrichten finden Sie unter Nachrichten mit den Kafka-Befehlszeilentools produzieren und verarbeiten.

Erforderliche Rollen und Berechtigungen zum Aktualisieren einer Nutzergruppe

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Consumer Group Editor (roles/managedkafka.consumerGroupEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten Ihrer Nutzergruppen benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten Ihrer Nutzergruppen erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten Ihrer Nutzergruppen erforderlich:

  • Nutzergruppen aktualisieren: managedkafka.consumerGroups.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zur Rolle „Managed Kafka Consumer Group Editor“ finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Dienst-Agent LESEZUGRIFF gewähren

Um die Offsets der Nutzergruppe zu aktualisieren, benötigt der Dienst-Agent Zugriff auf den Vorgang „LESEN“ für die Ressourcen „Thema“ und „Nutzergruppe“. Dieser Zugriff wird mit Apache Kafka-ACLs konfiguriert.

Wenn Sie keine Apache Kafka-ACLs für die Nutzergruppe und das zugehörige Thema im Cluster konfiguriert haben, hat der Dienst-Agent impliziten Zugriff auf diese Ressourcen. Sie können diesen Abschnitt überspringen.

Wenn Apache Kafka-ACLs für die Nutzergruppe und das zugehörige Thema im Cluster konfiguriert sind, benötigt der Dienst-Agent expliziten ACL-Zugriff für den Vorgang „LESEN“ für beide Ressourcen. Fügen Sie dazu ACL-Einträge hinzu, die dem Dienst-Agent Zugriff auf den Vorgang „LESEN“ für die entsprechende Nutzergruppe und das entsprechende Thema gewähren. Gehen Sie so vor:

  1. Installieren Sie die Google Cloud CLI.

  2. Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

  3. Führen Sie den folgenden Befehl aus, um die gcloud CLI zu initialisieren:

    gcloud init
  4. Führen Sie den gcloud managed-kafka acls add-acl-entry Befehl aus:

    gcloud managed-kafka acls add-acl-entry CONSUMER_GROUP_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*
    
    
      gcloud managed-kafka acls add-acl-entry TOPIC_ACL_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION \
      --principal=User:__AUTH_TOKEN__service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com \
      --operation=READ
      --permission-type=ALLOW
      --host=*

    Ersetzen Sie Folgendes:

    • CONSUMER_GROUP_ACL_ID (erforderlich): die eindeutige ID der Managed Service for Apache Kafka-ACL-Ressource, der Sie den ACL-Eintrag für die Nutzergruppe hinzufügen möchten. Wenn Sie den Zugriff auf alle Nutzergruppen anwenden möchten, verwenden Sie `allConsumerGroups`. Für eine bestimmte Nutzergruppe verwenden Sie `consumerGroup/CONSUMER_GROUP_NAME`.
    • TOPIC_ACL_ID (erforderlich): die eindeutige ID der Managed Service for Apache Kafka-ACL-Ressource, der Sie den ACL-Eintrag für das Thema hinzufügen möchten. Wenn Sie den Zugriff auf alle Themen anwenden möchten, verwenden Sie `allTopics`. Für ein bestimmtes Thema verwenden Sie `topic/TOPIC_NAME`.
    • CLUSTER_ID (erforderlich): die ID des Clusters der die ACL-Ressource enthält.
    • LOCATION (erforderlich): die Region, in der sich der Cluster befindet. Weitere Informationen finden Sie unter Unterstützte Standorte.
    • PROJECT_NUMBER (erforderlich): die Projektnummer des Projekts, in dem sich der Cluster befindet. Diese wird verwendet, um den Prinzipal namen des Dienst-Agents für den ACL-Eintrag zu erstellen.

Weitere Informationen zum Hinzufügen eines ACL-Eintrags finden Sie unter ACL-Eintrag hinzufügen.

Nutzergruppe aktualisieren

Achten Sie darauf, dass Sie die Schritte im Abschnitt Hinweis ausgeführt haben.

So aktualisieren Sie eine Nutzergruppe:

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell Sitzung gestartet und eine Befehlszeilenaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den gcloud managed-kafka consumer-groups update Befehl aus:

    gcloud managed-kafka consumer-groups update CONSUMER_GROUP_ID \
        --cluster=CLUSTER_ID \
        --location=LOCATION \
        --topics-file=TOPICS_FILE

    Ersetzen Sie Folgendes:

    • CLUSTER_ID: Die ID oder der Name des Clusters.

    • LOCATION: Der Standort des Clusters.

    • CONSUMER_GROUP_ID: Die ID oder der Name der Nutzergruppe.

    • TOPICS_FILE: Diese Einstellung gibt den Speicherort der Datei an, die die Konfiguration der Themen enthält, die für die Nutzergruppe aktualisiert werden sollen. Die Datei kann im JSON- oder YAML-Format vorliegen. Es kann sich um einen Dateipfad handeln oder den JSON- oder YAML-Inhalt direkt enthalten.

      Die Themendatei verwendet eine JSON-Struktur, um eine ConsumerGroup-Themenzuordnung darzustellen, in der Form { topicName1: {ConsumerPartitionMetadata}, topicName2:{ConsumerPartitionMetadata}}. ConsumerPartitionMetadata enthält für jedes Thema das Offset und die Metadaten für jede Partition.

      Wenn Sie das Offset für eine einzelne Partition (Partition 0) in einem Thema mit dem Namen topic1 auf 10 festlegen möchten, sieht die JSON-Konfiguration so aus:{"topic1": {"partitions": { 0 : { "offset": 10, "metadata": ""}}}}

      Im Folgenden sehen Sie ein Beispiel für den Inhalt einer topics.json-Datei:

      {
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            },
            "2": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        },
        "projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_NAME/TOPICS/OTHER_TOPIC_NAME": {
          "partitions": {
            "1": {
              "offset": "1",
              "metadata": "metadata"
            }
          }
        }
      }

    • TOPIC_PATH: Wenn Sie Themen in einer JSON- oder YAML-Datei angeben, fügen Sie den vollständigen Themenpfad ein. Sie erhalten ihn, indem Sie den gcloud managed-kafak topics describe Befehl ausführen. Er hat das Format projects/PROJECT_NUMBER/locations/LOCATION/clusters/CLUSTER_ID/topics/topic. .

Go

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateConsumerGroup(w io.Writer, projectID, region, clusterID, consumerGroupID, topicPath string, partitionOffsets map[int32]int64, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// consumerGroupID := "my-consumer-group"
	// topicPath := "my-topic-path"
	// partitionOffsets := map[int32]int64{1: 10, 2: 20, 3: 30}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	consumerGroupPath := fmt.Sprintf("%s/consumerGroups/%s", clusterPath, consumerGroupID)

	partitionMetadata := make(map[int32]*managedkafkapb.ConsumerPartitionMetadata)
	for partition, offset := range partitionOffsets {
		partitionMetadata[partition] = &managedkafkapb.ConsumerPartitionMetadata{
			Offset: offset,
		}
	}
	topicConfig := map[string]*managedkafkapb.ConsumerTopicMetadata{
		topicPath: {
			Partitions: partitionMetadata,
		},
	}
	consumerGroupConfig := managedkafkapb.ConsumerGroup{
		Name:   consumerGroupPath,
		Topics: topicConfig,
	}
	paths := []string{"topics"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateConsumerGroupRequest{
		UpdateMask:    updateMask,
		ConsumerGroup: &consumerGroupConfig,
	}
	consumerGroup, err := client.UpdateConsumerGroup(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateConsumerGroup got err: %w", err)
	}
	fmt.Fprintf(w, "Updated consumer group: %#v\n", consumerGroup)
	return nil
}

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConsumerGroup;
import com.google.cloud.managedkafka.v1.ConsumerGroupName;
import com.google.cloud.managedkafka.v1.ConsumerPartitionMetadata;
import com.google.cloud.managedkafka.v1.ConsumerTopicMetadata;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateConsumerGroupRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class UpdateConsumerGroup {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    String consumerGroupId = "my-consumer-group";
    Map<Integer, Integer> partitionOffsets =
        new HashMap<Integer, Integer>() {
          {
            put(1, 10);
            put(2, 20);
            put(3, 30);
          }
        };
    updateConsumerGroup(projectId, region, clusterId, topicId, consumerGroupId, partitionOffsets);
  }

  public static void updateConsumerGroup(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      String consumerGroupId,
      Map<Integer, Integer> partitionOffsets)
      throws Exception {
    TopicName topicName = TopicName.of(projectId, region, clusterId, topicId);
    ConsumerGroupName consumerGroupName =
        ConsumerGroupName.of(projectId, region, clusterId, consumerGroupId);

    Map<Integer, ConsumerPartitionMetadata> partitions =
        new HashMap<Integer, ConsumerPartitionMetadata>() {
          {
            for (Entry<Integer, Integer> partitionOffset : partitionOffsets.entrySet()) {
              ConsumerPartitionMetadata partitionMetadata =
                  ConsumerPartitionMetadata.newBuilder()
                      .setOffset(partitionOffset.getValue())
                      .build();
              put(partitionOffset.getKey(), partitionMetadata);
            }
          }
        };
    ConsumerTopicMetadata topicMetadata =
        ConsumerTopicMetadata.newBuilder().putAllPartitions(partitions).build();
    ConsumerGroup consumerGroup =
        ConsumerGroup.newBuilder()
            .setName(consumerGroupName.toString())
            .putTopics(topicName.toString(), topicMetadata)
            .build();
    FieldMask updateMask = FieldMask.newBuilder().addPaths("topics").build();

    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateConsumerGroupRequest request =
          UpdateConsumerGroupRequest.newBuilder()
              .setUpdateMask(updateMask)
              .setConsumerGroup(consumerGroup)
              .build();
      // This operation is being handled synchronously.
      ConsumerGroup response = managedKafkaClient.updateConsumerGroup(request);
      System.out.printf("Updated consumer group: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateConsumerGroup got err: %s", e.getMessage());
    }
  }
}

Python

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# consumer_group_id = "my-consumer-group"
# topic_path = "my-topic-path"
# partition_offsets = {10: 10}

client = managedkafka_v1.ManagedKafkaClient()

consumer_group = managedkafka_v1.ConsumerGroup()
consumer_group.name = client.consumer_group_path(
    project_id, region, cluster_id, consumer_group_id
)

topic_metadata = managedkafka_v1.ConsumerTopicMetadata()
for partition, offset in partition_offsets.items():
    partition_metadata = managedkafka_v1.ConsumerPartitionMetadata(offset=offset)
    topic_metadata.partitions[partition] = partition_metadata
consumer_group.topics = {
    topic_path: topic_metadata,
}

update_mask = field_mask_pb2.FieldMask()
update_mask.paths.append("topics")

request = managedkafka_v1.UpdateConsumerGroupRequest(
    update_mask=update_mask,
    consumer_group=consumer_group,
)

try:
    response = client.update_consumer_group(request=request)
    print("Updated consumer group:", response)
except NotFound as e:
    print(f"Failed to update consumer group {consumer_group_id} with error: {e.message}")

Nächste Schritte

Apache Kafka® ist eine eingetragene Marke der Apache Software Foundation oder ihrer Tochtergesellschaften in den USA und/oder anderen Ländern.