Thema für Google Cloud Managed Service for Apache Kafka erstellen

In Managed Service for Apache Kafka werden Nachrichten in Themen organisiert. Ein Thema besteht aus Partitionen. Eine Partition ist eine geordnete, unveränderliche Sequenz von Datensätzen, die einem einzelnen Broker in einem Kafka-Cluster gehört. Sie müssen ein Thema erstellen, um Nachrichten zu veröffentlichen oder zu verarbeiten.

Sie können ein Thema mit der Google Cloud Console, der Google Cloud CLI, der Clientbibliothek, der Managed Kafka API oder den Open-Source-Apache Kafka APIs erstellen.

Hinweise

Sie müssen zuerst einen Cluster erstellen, bevor Sie ein Thema erstellen können. Achten Sie darauf, dass Sie Folgendes eingerichtet haben:

Erforderliche Rollen und Berechtigungen zum Erstellen eines Themas

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Topic Editor (roles/managedkafka.topicEditor) für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Themas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Themas erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen, um die notwendigen Berechtigungen anzuzeigen:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Themas erforderlich:

  • Thema erstellen: managedkafka.topics.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Die Rolle „Managed Kafka Topic Editor“ enthält auch die Rolle „Managed Kafka Viewer“. Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Eigenschaften eines Managed Service for Apache Kafka-Themas

Wenn Sie ein Managed Service for Apache Kafka-Thema erstellen oder aktualisieren, müssen Sie die folgenden Eigenschaften angeben.

Name des Themas

Der Name des Managed Service for Apache Kafka-Themas, das Sie erstellen. Tipps zum Benennen eines Themas finden Sie unter Richtlinien zum Benennen einer Managed Service for Apache Kafka-Ressource. Der Name eines Themas kann nicht geändert werden.

Anzahl der Partitionen

Die Anzahl der Partitionen im Thema. Sie können ein Thema bearbeiten, um die Anzahl der Partitionen zu erhöhen, aber nicht zu verringern. Wenn Sie die Anzahl der Partitionen für ein Thema mit einem Schlüssel erhöhen, kann sich die Verteilung der Nachrichten ändern.

Replikationsfaktor

Die Anzahl der Replikate für jede Partition. Wenn Sie den Wert nicht angeben, wird der Standardreplikationsfaktor des Clusters verwendet.

Ein höherer Replikationsfaktor kann die Datenkonsistenz bei Broker-Fehlern verbessern, da Daten auf mehrere Broker repliziert werden. Für Produktionsumgebungen wird ein Replikationsfaktor von mindestens 3 empfohlen. Höhere Replikatzahlen erhöhen die Kosten für die lokale Speicherung und Datenübertragung für das Thema. Sie erhöhen jedoch nicht die Kosten für den persistenten Speicher. Der Replikationsfaktor darf die Anzahl der verfügbaren Broker nicht überschreiten.

Sonstige Parameter

Sie können auch andere Konfigurationsparameter auf Apache Kafka-Themenebene festlegen. Sie werden als key=value-Paare angegeben, die die Clusterstandardwerte überschreiben.

Konfigurationen für Themen haben einen Serverstandardwert und eine optionale Überschreibung pro Thema. Das Format ist eine durch Kommas getrennte Liste von KEY=VALUE-Paaren, wobei KEY der Name der Konfigurationseigenschaft des Kafka-Themas und VALUE die erforderliche Einstellung ist.Mit diesen Schlüssel/Wert-Paaren können Sie die Clusterstandardwerte überschreiben. Beispiele sind flush.ms=10 und compression.type=producer.

Eine Liste aller unterstützten Konfigurationen auf Themenebene finden Sie in der Apache Kafka-Dokumentation unter Topic-level configs.

Thema erstellen

Bevor Sie ein Thema erstellen, sollten Sie sich die Themeneigenschaften ansehen.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Cluster, für den Sie ein Thema erstellen möchten.

    Die Seite Clusterdetails wird geöffnet.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Thema erstellen.

    Die Seite Kafka-Thema erstellen wird geöffnet.

  4. Geben Sie für Themenname einen String ein.

  5. Geben Sie für Anzahl der Partitionen die gewünschte Anzahl der Partitionen ein oder behalten Sie den Standardwert bei.

  6. Geben Sie unter Replikationsfaktor den gewünschten Replikationsfaktor ein oder behalten Sie den Standardwert bei.

  7. Optional: Wenn Sie Themenkonfigurationen ändern möchten, fügen Sie sie als durch Kommas getrennte Schlüssel/Wert-Paare im Feld Konfigurationen hinzu.

  8. Klicken Sie auf Erstellen.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Zum Abrufen der aktuellen Richtlinie führen Sie den Befehl gcloud managed-kafka topics create aus:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Der Name des Themas.
    • CLUSTER: Der Name des Clusters, in dem Sie das Thema erstellen möchten.
    • LOCATION: Die Region des Clusters.
    • PARTITIONS: Die Anzahl der Partitionen für das Thema.
    • REPLICATION_FACTOR: Der Replikationsfaktor für das Thema.
    • CONFIGS: Optionale Parameter auf Themaebene. Geben Sie die Werte als durch Kommas getrennte Schlüssel/Wert-Paare an. Beispiel: compression.type=producer
  3. Kafka-Befehlszeile

    Bevor Sie diesen Befehl ausführen, installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka-Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools erstellen und verarbeiten.

    Führen Sie den Befehl kafka-topics.sh so aus:

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Ersetzen Sie Folgendes:

    • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.

    • TOPIC_ID: Der Name des Themas.

    • PARTITIONS: Die Anzahl der Partitionen für das Thema.

    • REPLICATION_FACTOR: Der Replikationsfaktor für das Thema.

    REST

    Ersetzen Sie diese Werte in den folgenden Anfragedaten:

    • PROJECT_ID: Ihre Google Cloud Projekt-ID
    • LOCATION: Der Standort des Clusters
    • CLUSTER_ID: Die ID des Clusters
    • TOPIC_ID: ID des Themas
    • PARTITION_COUNT: die Anzahl der Partitionen für das Thema
    • REPLICATION_FACTOR: die Anzahl der Replikate jeder Partition

    HTTP-Methode und URL:

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    JSON-Text anfordern:

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

    Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Sie können eine Terraform-Ressource verwenden, um ein Thema zu erstellen.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

    Go

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Go API für Managed Service for Apache Kafka.

    Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Java API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Bevor Sie dieses Beispiel ausprobieren, folgen Sie der Anleitung für die Einrichtung von Python unter Clientbibliotheken installieren. Weitere Informationen finden Sie in der Referenzdokumentation zur Python API für Managed Service for Apache Kafka.

    Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Managed Service for Apache Kafka zu authentifizieren. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

Nächste Schritte