Thema in Google Cloud Managed Service for Apache Kafka aktualisieren

Nachdem ein Thema erstellt wurde, können Sie die Themenkonfiguration bearbeiten, um diese Attribute zu aktualisieren: die Anzahl der Partitionen und Themenkonfigurationen, die nicht standardmäßig auf die bereits auf Clusterebene festgelegten Attribute festgelegt sind. Sie können die Anzahl der Partitionen nur erhöhen, nicht verringern.

Wenn Sie ein einzelnes Thema aktualisieren möchten, können Sie die Google Cloud Konsole, die Google Cloud CLI, die Clientbibliothek, die Managed Kafka API oder die Open-Source-Apache Kafka APIs verwenden.

Erforderliche Rollen und Berechtigungen zum Bearbeiten eines Themas

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Managed Kafka Topic Editor“ (roles/managedkafka.topicEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten eines Themas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten eines Themas erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten eines Themas erforderlich:

  • Thema aktualisieren: managedkafka.topics.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Thema bearbeiten

So bearbeiten Sie ein Thema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

    Die Cluster, die Sie in einem Projekt erstellt haben, werden aufgelistet.

  2. Klicken Sie auf den Cluster, zu dem das Thema gehört, das Sie bearbeiten möchten.

    Die Seite Clusterdetails wird geöffnet. Auf der Seite mit den Clusterdetails werden die Themen auf dem Tab Ressourcen aufgeführt.

  3. Klicken Sie auf das Thema, das Sie bearbeiten möchten.

    Die Seite Themendetails wird geöffnet.

  4. Klicken Sie auf Bearbeiten, um Änderungen vorzunehmen.

  5. Klicken Sie nach den Änderungen auf Speichern.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka topics update aus:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Mit diesem Befehl wird die Konfiguration eines vorhandenen Themas im angegebenen Managed Service for Apache Kafka-Cluster geändert. Mit diesem Befehl können Sie die Anzahl der Partitionen erhöhen und die Konfigurationseinstellungen auf Themenebene aktualisieren.

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Die ID des Themas.
    • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
    • LOCATION_ID: Der Standort des Clusters.
    • PARTITIONS: Optional: Die aktualisierte Anzahl der Partitionen für das Thema. Sie können die Anzahl der Partitionen nur erhöhen, nicht verringern.
    • CONFIGS (optional): Eine Liste der zu aktualisierenden Konfigurationseinstellungen. Geben Sie die Werte als durch Kommas getrennte Liste von Schlüssel/Wert-Paaren an. Beispiel: retention.ms=3600000,retention.bytes=10000000.
  3. REST

    Ersetzen Sie diese Werte in den folgenden Anfragedaten:

    • PROJECT_ID: Ihre Google Cloud Projekt-ID
    • LOCATION: Der Standort des Clusters
    • CLUSTER_ID: Die ID des Clusters
    • TOPIC_ID: ID des Themas
    • UPDATE_MASK: Die Felder, die aktualisiert werden sollen, als durch Kommas getrennte Liste vollständig qualifizierter Namen. Beispiel: partitionCount
    • PARTITION_COUNT: die aktualisierte Anzahl der Partitionen für das Thema

    HTTP-Methode und URL:

    PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

    JSON-Text anfordern:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT
    }
    

    Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

    Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
      "metadata": {
        "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
        "createTime": "CREATE_TIME",
        "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
        "verb": "update",
        "requestedCancellation": false,
        "apiVersion": "v1"
      },
      "done": false
    }
    

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    	"google.golang.org/protobuf/types/known/fieldmaskpb"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 20
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	TopicConfig := managedkafkapb.Topic{
    		Name:           topicPath,
    		PartitionCount: partitionCount,
    		Configs:        configs,
    	}
    	paths := []string{"partition_count", "configs"}
    	updateMask := &fieldmaskpb.FieldMask{
    		Paths: paths,
    	}
    
    	req := &managedkafkapb.UpdateTopicRequest{
    		UpdateMask: updateMask,
    		Topic:      &TopicConfig,
    	}
    	topic, err := client.UpdateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.UpdateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
    import com.google.protobuf.FieldMask;
    import java.io.IOException;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    public class UpdateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 200;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "1");
              }
            };
        updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
      }
    
      public static void updateTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .putAllConfigs(configs)
                .build();
        String[] paths = {"partition_count", "configs"};
        FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          UpdateTopicRequest request =
              UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.updateTopic(request);
          System.out.printf("Updated topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import NotFound
    from google.cloud import managedkafka_v1
    from google.protobuf import field_mask_pb2
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 20
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.configs = configs
    update_mask = field_mask_pb2.FieldMask()
    update_mask.paths.extend(["partition_count", "configs"])
    
    # For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
    request = managedkafka_v1.UpdateTopicRequest(
        update_mask=update_mask,
        topic=topic,
    )
    
    try:
        response = client.update_topic(request=request)
        print("Updated topic:", response)
    except NotFound as e:
        print(f"Failed to update topic {topic_id} with error: {e.message}")
    

Nachrichtenspeicherung konfigurieren

Kafka speichert Nachrichten in Logsegmentdateien. Standardmäßig löscht Kafka Segmentdateien nach einem Aufbewahrungszeitraum oder wenn eine Partition einen Grenzwert für die Datengröße überschreitet. Sie können dieses Verhalten ändern, indem Sie die Log-Kompaktierung aktivieren. Wenn die Logverdichtung aktiviert ist, behält Kafka nur den letzten Wert für jeden Schlüssel bei.

Google Cloud Managed Service for Apache Kafka verwendet Tiered Storage. Das bedeutet, dass abgeschlossene Logsegmente nicht lokal, sondern remote gespeichert werden. Weitere Informationen zum Tiered Storage finden Sie in der Apache Kafka-Dokumentation unter Tiered Storage.

Aufbewahrungswerte festlegen

Wenn die Logverdichtung nicht aktiviert ist, steuern die folgenden Einstellungen, wie Kafka Logsegmentdateien speichert:

  • retention.ms: Die maximale Zeitspanne zum Speichern von Segmentdateien in Millisekunden.
  • retention.bytes: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen. Wenn die Daten in einer Partition diesen Wert überschreiten, werden ältere Segmentdateien von Kafka verworfen.

Verwenden Sie zum Aktualisieren dieser Einstellungen entweder die gcloud CLI oder die Kafka-CLI:

gcloud

Führen Sie den Befehl gcloud managed-kafka topics update aus, um die Aufbewahrung von Nachrichten festzulegen.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ersetzen Sie Folgendes:

  • TOPIC_ID: Die ID des Themas.
  • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
  • LOCATION_ID: Der Standort des Clusters.
  • RETENTION_PERIOD: Die maximale Zeit, die Segmentdateien gespeichert werden sollen, in Millisekunden.
  • MAX_BYTES: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen.

Kafka-Befehlszeile

Bevor Sie diesen Befehl ausführen, installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka-Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools erstellen und verarbeiten.

Führen Sie den Befehl kafka-configs.sh aus:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ersetzen Sie Folgendes:

  • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.
  • TOPIC_ID: Die ID des Themas.
  • RETENTION_PERIOD: Die maximale Zeit, die Segmentdateien gespeichert werden sollen, in Millisekunden.
  • MAX_BYTES: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen.

Log-Kompaktierung aktivieren

Wenn die Log-Kompaktierung aktiviert ist, speichert Kafka nur die letzte Nachricht für jeden Schlüssel. Die Protokollkomprimierung ist standardmäßig deaktiviert. Um die Logverdichtung für ein Thema zu aktivieren, setzen Sie die Konfiguration cleanup.policy auf "compact", wie unten dargestellt:

gcloud

Führen Sie den Befehl gcloud managed-kafka topics update aus.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Ersetzen Sie Folgendes:

  • TOPIC_ID: Die ID des Themas.
  • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
  • LOCATION_ID: Der Standort des Clusters.

Kafka-Befehlszeile

Bevor Sie diesen Befehl ausführen, installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka-Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools erstellen und verarbeiten.

Führen Sie den Befehl kafka-configs.sh aus:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Ersetzen Sie Folgendes:

  • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.
  • TOPIC_ID: Die ID des Themas.

Beschränkungen

  • Sie können keine Themenkonfigurationen für den Remotespeicher überschreiben, z. B. remote.storage.enable.

  • Sie können Themenkonfigurationen für Logsegmentdateien wie segment.bytes nicht überschreiben.

  • Wenn Sie die Log-Kompaktierung für ein Thema aktivieren, wird die mehrstufige Speicherung für dieses Thema implizit deaktiviert. Alle Log-Dateien für das Thema werden lokal gespeichert.

Nächste Schritte