Thema für Google Cloud Managed Service for Apache Kafka erstellen

In Managed Service for Apache Kafka werden Nachrichten in Themen organisiert. Ein Thema besteht aus Partitionen. Eine Partition ist eine sortierte, unveränderliche Sequenz von Datensätzen, die einem einzelnen Broker in einem Kafka-Cluster gehört. Sie müssen ein Thema erstellen, um Nachrichten zu veröffentlichen oder zu nutzen.

Sie können ein Thema über die Google Cloud Console, die Google Cloud CLI, die Clientbibliothek, die Managed Kafka API oder die Open-Source- Apache Kafka APIs erstellen.

Hinweis

Sie müssen zuerst einen Cluster erstellen, bevor Sie ein Thema erstellen können. Achten Sie darauf, dass Sie Folgendes eingerichtet haben:

Erforderliche Rollen und Berechtigungen zum Erstellen eines Themas

Bitten Sie Ihren Administrator, Ihnen die IAM-Rolle Managed Kafka Topic Editor (roles/managedkafka.topicEditor) für Ihr Projekt zu erteilen, um die Berechtigungen zu erhalten, die Sie zum Erstellen eines Themas benötigen. Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Diese vordefinierte Rolle enthält die Berechtigungen, die zum Erstellen eines Themas erforderlich sind. Maximieren Sie den Abschnitt Erforderliche Berechtigungen , um die notwendigen Berechtigungen anzuzeigen, die erforderlich sind:

Erforderliche Berechtigungen

Die folgenden Berechtigungen sind zum Erstellen eines Themas erforderlich:

  • Thema erstellen: managedkafka.topics.create

Sie können diese Berechtigungen auch mit benutzerdefinierten Rollen oder anderen vordefinierten Rollen erhalten.

Die Rolle „Managed Kafka Topic Editor“ enthält auch die Rolle „Managed Kafka Viewer“. Weitere Informationen zu dieser Rolle finden Sie unter Vordefinierte Rollen für Managed Service for Apache Kafka.

Eigenschaften eines Managed Service for Apache Kafka-Themas

Wenn Sie ein Managed Service for Apache Kafka-Thema erstellen oder aktualisieren, müssen Sie die folgenden Eigenschaften angeben.

Name des Themas

Der Name des zu erstellenden Managed Service for Apache Kafka-Themas. Tipps zum Benennen eines Themas finden Sie unter Richtlinien zum Benennen einer Ressource von Managed Service for Apache Kafka. Der Name eines Themas kann nicht geändert werden.

Anzahl der Partitionen

Die Anzahl der Partitionen im Thema. Sie können ein Thema bearbeiten, um die Anzahl der Partitionen zu erhöhen, aber nicht verringern. Wenn Sie die Anzahl der Partitionen für ein Thema erhöhen, das einen Schlüssel verwendet, kann sich die Verteilung der Nachrichten ändern.

Replikationsfaktor

Die Anzahl der Replikate für jede Partition. Wenn Sie den Wert nicht angeben, wird der Standardreplikationsfaktor des Clusters verwendet.

Ein höherer Replikationsfaktor kann die Datenkonsistenz bei Broker-Ausfällen verbessern, da Daten auf mehrere Broker repliziert werden. Für Produktionsumgebungen wird ein Replikationsfaktor von 3 oder höher empfohlen. Eine höhere Anzahl von Replikaten erhöht die Kosten für lokalen Speicher und Datenübertragung für das Thema. Die Kosten für den persistenten Speicher steigen dadurch jedoch nicht. Der Replikationsfaktor darf die Anzahl der verfügbaren Broker nicht überschreiten.

Sonstige Parameter

Sie können auch andere Konfigurationsparameter auf Themenebene für Apache Kafka festlegen. Diese werden als key=value-Paare angegeben, die die Cluster-Standardwerte überschreiben.

Konfigurationen im Zusammenhang mit Themen haben einen Serverstandardwert und eine optionale Überschreibung pro Thema. Das Format ist eine durch Kommas getrennte Liste von KEY=VALUE Paaren, wobei KEY der Name der Konfigurationseigenschaft des Kafka-Themas und VALUE die erforderliche Einstellung ist.Mit diesen Schlüssel/Wert-Paaren können Sie die Cluster Standardwerte überschreiben. Beispiele sind flush.ms=10 und compression.type=producer.

Eine Liste aller unterstützten Konfigurationen auf Themenebene finden Sie Topic-level configs in der Apache Kafka-Dokumentation.

Thema erstellen

Prüfen Sie vor dem Erstellen eines Themas die Themeneigenschaften.

Console

  1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

    Zu den Clustern

  2. Klicken Sie auf den Cluster, für den Sie ein Thema erstellen möchten.

    Die Seite Clusterdetails wird geöffnet.

  3. Klicken Sie auf der Seite mit den Clusterdetails auf Thema erstellen.

    Die Seite Kafka-Thema erstellen wird geöffnet.

  4. Geben Sie für Name des Themas einen String ein.

  5. Geben Sie für Anzahl der Partitionen die gewünschte Anzahl der Partitionen ein oder behalten Sie den Standardwert bei.

  6. Geben Sie für Replikationsfaktor den gewünschten Replikationsfaktor ein oder behalten Sie den Standardwert bei.

  7. (Optional) Wenn Sie Themenkonfigurationen ändern möchten, fügen Sie sie als durch Kommas getrennte Schlüssel/Wert-Paare in das Feld Konfigurationen ein.

  8. Klicken Sie auf Erstellen.

gcloud

  1. Aktivieren Sie Cloud Shell in der Google Cloud Console.

    Cloud Shell aktivieren

    Unten in der Google Cloud Console wird eine Cloud Shell Sitzung gestartet und eine Befehlszeilenaufforderung angezeigt. Cloud Shell ist eine Shell-Umgebung in der das Google Cloud CLI bereits installiert ist und Werte für Ihr aktuelles Projekt bereits festgelegt sind. Das Initialisieren der Sitzung kann einige Sekunden dauern.

  2. Führen Sie den gcloud managed-kafka topics create Befehl aus:

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Ersetzen Sie Folgendes:

    • TOPIC_ID: Der Name des Themas.
    • CLUSTER: Der Name des Clusters, in dem Sie das Thema erstellen möchten.
    • LOCATION: Die Region des Clusters.
    • PARTITIONS: Die Anzahl der Partitionen für das Thema.
    • REPLICATION_FACTOR: Der Replikationsfaktor für das Thema.
    • CONFIGS: Optionale Parameter auf Themenebene. Geben Sie sie als durch Kommas getrennte Schlüssel/Wert-Paare an. Beispiel: compression.type=producer.

Kafka-Befehlszeile

Bevor Sie diesen Befehl ausführen, installieren Sie die Kafka-Befehlszeilentools auf einer Compute Engine-VM. Die VM muss ein Subnetz erreichen können, das mit Ihrem Managed Service for Apache Kafka Cluster verbunden ist. Folgen Sie der Anleitung unter Nachrichten mit den Kafka-Befehlszeilentools produzieren und nutzen.

Führen Sie den Befehl kafka-topics.sh so aus:

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Ersetzen Sie Folgendes:

  • BOOTSTRAP_ADDRESS: Die Bootstrap-Adresse des Managed Service for Apache Kafka-Clusters.

  • TOPIC_ID: Der Name des Themas.

  • PARTITIONS: Die Anzahl der Partitionen für das Thema.

  • REPLICATION_FACTOR: Der Replikationsfaktor für das Thema.

REST

Ersetzen Sie diese Werte in den folgenden Anfragedaten:

  • PROJECT_ID: Ihre Google Cloud Projekt-ID
  • LOCATION: Der Standort des Clusters
  • CLUSTER_ID: Die ID des Clusters
  • TOPIC_ID: Die ID des Themas
  • PARTITION_COUNT: Die Anzahl der Partitionen für das Thema
  • REPLICATION_FACTOR: Die Anzahl der Replikate jeder Partition

HTTP-Methode und URL:

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

JSON-Text anfordern:

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Wenn Sie die Anfrage senden möchten, maximieren Sie eine der folgenden Optionen:

Sie sollten eine JSON-Antwort ähnlich wie diese erhalten:

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

Sie können eine Terraform-Ressource verwenden, um ein Thema zu erstellen.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Informationen zum Anwenden oder Entfernen einer Terraform-Konfiguration finden Sie unter Grundlegende Terraform-Befehle.

Go

Folgen Sie der Einrichtungsanleitung für Go unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Go API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen(Application Default Credentials, ADC) ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Java API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Clientbibliotheken installieren, bevor Sie dieses Beispiel anwenden. Weitere Informationen finden Sie in der Referenzdokumentation zur Managed Service for Apache Kafka Python API.

Richten Sie zur Authentifizierung bei Managed Service for Apache Kafka die Standardanmeldedaten für Anwendungen ein. Weitere Informationen finden Sie unter ADC für eine lokale Entwicklungsumgebung einrichten.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

Nächste Schritte