Thema in Google Cloud Managed Service for Apache Kafka aktualisieren

Nachdem ein Thema erstellt wurde, können Sie die Themenkonfiguration bearbeiten, um diese Eigenschaften zu aktualisieren: die Anzahl der Partitionen und Themenkonfigurationen, die nicht standardmäßig auf die bereits auf Clusterebene festgelegten Eigenschaften festgelegt sind. Sie können die Anzahl der Partitionen nur erhöhen, nicht verringern.

Wenn Sie ein einzelnes Thema aktualisieren möchten, können Sie die Google Cloud Console, die Google Cloud CLI, die Clientbibliothek, die Managed Kafka API oder die Open-Source- Apache Kafka APIs verwenden.

Erforderliche Rollen und Berechtigungen zum Bearbeiten eines Themas

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle „Managed Kafka-Themeneditor“ (roles/managedkafka.topicEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Bearbeiten eines Themas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Bearbeiten eines Themas erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Bearbeiten eines Themas erforderlich:

  • Thema aktualisieren: managedkafka.topics.update

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Thema bearbeiten

So bearbeiten Sie ein Thema:

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

    Die in einem Projekt erstellten Cluster werden aufgelistet.

  2. Klicken Sie auf den Cluster, zu dem das Thema gehört, das Sie bearbeiten möchten.

    Die Seite Clusterdetails wird geöffnet. Auf der Seite mit den Clusterdetails werden auf dem Tab Ressourcen die Themen aufgelistet.

  3. Klicken Sie auf das Thema, das Sie bearbeiten möchten.

    Die Seite Themendetails wird geöffnet.

  4. Klicken Sie auf Bearbeiten, um die Änderungen vorzunehmen.

  5. Klicken Sie nach den Änderungen auf Speichern.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell Sitzung gestartet und eine Eingabeaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den gcloud managed-kafka topics update Befehl aus:

    gcloud managed-kafka topics update TOPIC_ID \
      --cluster=CLUSTER_ID \
      --location=LOCATION_ID \
      --partitions=PARTITIONS \
      --configs=CONFIGS
    

    Mit diesem Befehl wird die Konfiguration eines vorhandenen Themas im angegebenen Managed Service for Apache Kafka-Cluster geändert. Mit diesem Befehl können Sie die Anzahl der Partitionen erhöhen und Konfigurationseinstellungen auf Themenebene aktualisieren.

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Die ID des Themas.
    • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
    • LOCATION_ID: Der Standort des Clusters.
    • PARTITIONS: Optional: Die aktualisierte Anzahl der Partitionen für das Thema. Sie können die Anzahl der Partitionen nur erhöhen, nicht verringern.
    • CONFIGS: Optional: Eine Liste der zu aktualisierenden Konfiguration seinstellungen. Geben Sie die Einstellungen als durch Kommas getrennte Liste von Schlüssel/Wert-Paaren an. Beispiel: retention.ms=3600000,retention.bytes=10000000.

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: Ihre Google Cloud Projekt-ID
  • LOCATION: Der Standort des Clusters
  • CLUSTER_ID: Die ID des Clusters
  • TOPIC_ID: Die ID des Themas
  • UPDATE_MASK: Die zu aktualisierenden Felder als durch Kommas getrennte Liste mit vollständig qualifizierten Namen. Beispiel: partitionCount
  • PARTITION_COUNT: Die aktualisierte Anzahl der Partitionen für das Thema

HTTP-Methode und URL:

PATCH https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID?updateMask=UPDATE_MASK

JSON-Text anfordern:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.managedkafka.v1.OperationMetadata",
    "createTime": "CREATE_TIME",
    "target": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID",
    "verb": "update",
    "requestedCancellation": false,
    "apiVersion": "v1"
  },
  "done": false
}

Go

Folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.

Richten Sie Standardanmeldedaten für Anwendungen(ADC) ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
	"google.golang.org/protobuf/types/known/fieldmaskpb"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func updateTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 20
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	TopicConfig := managedkafkapb.Topic{
		Name:           topicPath,
		PartitionCount: partitionCount,
		Configs:        configs,
	}
	paths := []string{"partition_count", "configs"}
	updateMask := &fieldmaskpb.FieldMask{
		Paths: paths,
	}

	req := &managedkafkapb.UpdateTopicRequest{
		UpdateMask: updateMask,
		Topic:      &TopicConfig,
	}
	topic, err := client.UpdateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", topic)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Java API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import com.google.cloud.managedkafka.v1.UpdateTopicRequest;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class UpdateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 200;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "1");
          }
        };
    updateTopic(projectId, region, clusterId, topicId, partitionCount, configs);
  }

  public static void updateTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .putAllConfigs(configs)
            .build();
    String[] paths = {"partition_count", "configs"};
    FieldMask updateMask = FieldMask.newBuilder().addAllPaths(Arrays.asList(paths)).build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setUpdateMask(updateMask).setTopic(topic).build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.updateTopic(request);
      System.out.printf("Updated topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.updateCluster got err: %s", e.getMessage());
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Python API.

Richten Sie Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import NotFound
from google.cloud import managedkafka_v1
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 20
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.configs = configs
update_mask = field_mask_pb2.FieldMask()
update_mask.paths.extend(["partition_count", "configs"])

# For a list of editable fields, one can check https://cloud.google.com/managed-kafka/docs/create-topic#properties.
request = managedkafka_v1.UpdateTopicRequest(
    update_mask=update_mask,
    topic=topic,
)

try:
    response = client.update_topic(request=request)
    print("Updated topic:", response)
except NotFound as e:
    print(f"Failed to update topic {topic_id} with error: {e.message}")

Nachrichtenspeicherung konfigurieren

Kafka speichert Nachrichten in Logsegmentdateien. Standardmäßig löscht Kafka Segmentdateien nach einer Aufbewahrungsdauer oder wenn eine Partition einen Grenzwert für die Datengröße überschreitet. Sie können dieses Verhalten ändern, indem Sie die Logkomprimierungaktivieren. Wenn die Logkomprimierung aktiviert ist, behält Kafka nur den letzten Wert für jeden Schlüssel bei.

Google Cloud Managed Service for Apache Kafka verwendet mehrstufigen Speicher. Das bedeutet, dass abgeschlossene Logsegmente nicht lokal, sondern remote gespeichert werden. Weitere Informationen zum mehrstufigen Speicher finden Sie in der Apache Kafka-Dokumentation unter Tiered Storage.

Aufbewahrungswerte festlegen

Wenn die Logkomprimierung nicht aktiviert ist, steuern die folgenden Einstellungen, wie Kafka Logsegmentdateien speichert:

  • retention.ms: Die maximale Aufbewahrungsdauer für Segmentdateien in Millisekunden.
  • retention.bytes: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen. Wenn die Daten in einer Partition diesen Wert überschreiten, verwirft Kafka ältere Segmentdateien.

Verwenden Sie die gcloud CLI oder die Kafka CLI, um diese Einstellungen zu aktualisieren:

gcloud

Führen Sie den gcloud managed-kafka topics update Befehl aus, um die Nachrichtenspeicherung festzulegen.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ersetzen Sie Folgendes:

  • TOPIC_ID: Die ID des Themas.
  • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
  • LOCATION_ID: Der Standort des Clusters.
  • RETENTION_PERIOD: Die maximale Aufbewahrungsdauer für Segmentdateien in Millisekunden.
  • MAX_BYTES: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen.

Kafka CLI

Installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM, bevor Sie diesen Befehl ausführen. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools produzieren und nutzen.

Führen Sie den Befehl kafka-configs.sh aus:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config retention.ms=RETENTION_PERIOD,retention.bytes=MAX_BYTES

Ersetzen Sie Folgendes:

  • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.
  • TOPIC_ID: Die ID des Themas.
  • RETENTION_PERIOD: Die maximale Aufbewahrungsdauer für Segmentdateien in Millisekunden.
  • MAX_BYTES: Die maximale Anzahl von Byte, die pro Partition gespeichert werden sollen.

Logkomprimierung aktivieren

Wenn die Logkomprimierung aktiviert ist, speichert Kafka nur die letzte Nachricht für jeden Schlüssel. Die Logkomprimierung ist standardmäßig deaktiviert. Setzen Sie die Konfiguration cleanup.policy für ein Thema auf "compact", um die Logkomprimierung zu aktivieren:

gcloud

Führen Sie den gcloud managed-kafka topics update Befehl aus.

gcloud managed-kafka topics update TOPIC_ID \
  --cluster=CLUSTER_ID \
  --location=LOCATION_ID \
  --configs=cleanup.policy=compact

Ersetzen Sie Folgendes:

  • TOPIC_ID: Die ID des Themas.
  • CLUSTER_ID: Die ID des Clusters, der das Thema enthält.
  • LOCATION_ID: Der Standort des Clusters.

Kafka CLI

Installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM, bevor Sie diesen Befehl ausführen. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools produzieren und nutzen.

Führen Sie den Befehl kafka-configs.sh aus:

kafka-configs.sh --alter \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --entity-type topics \
  --entity-name TOPIC_ID \
  --add-config cleanup.policy=compact

Ersetzen Sie Folgendes:

  • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.
  • TOPIC_ID: Die ID des Themas.

Beschränkungen

  • Sie können Themenkonfigurationen für Remote-Speicher wie remote.storage.enable nicht überschreiben.

  • Sie können Themenkonfigurationen für Logsegmentdateien wie segment.bytes nicht überschreiben.

  • Wenn Sie die Logkomprimierung für ein Thema aktivieren, wird der mehrstufige Speicher für dieses Thema implizit deaktiviert. Alle Logdateien für das Thema werden lokal gespeichert.

Nächste Schritte