Créer un sujet Google Cloud Managed Service pour Apache Kafka

Dans Managed Service pour Apache Kafka, les messages sont organisés en sujets. Un sujet est constitué de partitions. Une partition est une séquence de données immuable et ordonnée, appartenant à un seul courtier dans un cluster Kafka. Vous devez créer un sujet pour publier ou consommer des messages.

Pour créer un sujet, vous pouvez utiliser la Google Cloud console, la Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source.

Avant de commencer

Vous devez d'abord créer un cluster avant de créer un sujet. Assurez-vous d'avoir configuré les éléments suivants :

Rôles et autorisations requis pour créer un sujet

Pour obtenir les autorisations nécessaires pour créer un sujet, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de sujet Managed Kafka (roles/managedkafka.topicEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un sujet. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un sujet :

  • Créer un sujet : managedkafka.topics.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Le rôle Éditeur de sujet Managed Kafka contient également le rôle Lecteur Managed Kafka. Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis Managed Service pour Apache Kafka.

Propriétés d'un sujet Managed Service pour Apache Kafka

Lorsque vous créez ou mettez à jour un sujet Managed Service pour Apache Kafka, vous devez spécifier les propriétés suivantes.

Nom du sujet

Nom du sujet Managed Service pour Apache Kafka que vous créez. Pour obtenir des instructions sur la façon de nommer un sujet, consultez Consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un sujet est immuable.

Nombre de partitions

Nombre de partitions dans le sujet. Vous pouvez modifier un sujet pour augmenter le nombre de partitions, mais pas le diminuer. L'augmentation du nombre de partitions pour un sujet qui utilise une clé peut modifier la façon dont les messages sont distribués.

Facteur de réplication

Nombre d'instances répliquées pour chaque partition. Si vous ne spécifiez pas de valeur, le facteur de réplication par défaut du cluster est utilisé.

Un facteur de réplication plus élevé peut améliorer la cohérence des données en cas de défaillance du courtier, car les données sont répliquées sur plusieurs courtiers. Pour les environnements de production, il est recommandé d'utiliser un facteur de réplication de 3 ou plus. Un nombre plus élevé d'instances répliquées augmente les coûts de stockage local et de transfert de données pour le sujet. Toutefois, cela n'augmente pas les coûts de stockage persistant. Le facteur de réplication ne peut pas dépasser le nombre de courtiers disponibles.

Autres paramètres

Vous pouvez également définir d'autres paramètres de configuration Apache Kafka au niveau du sujet. Ils sont spécifiés sous forme de paires key=value qui remplacent les valeurs par défaut du cluster.

Les configurations liées aux sujets ont une valeur par défaut du serveur et un remplacement facultatif par sujet. Le format est une liste de paires KEY=VALUE séparées par une virgule, où KEY est le nom de la propriété de configuration du sujet Kafka et VALUE est le paramètre requis.Ces paires clé/valeur vous aident à remplacer les valeurs par défaut du cluster. Par exemple, flush.ms=10 et compression.type=producer.

Pour obtenir la liste de toutes les configurations compatibles au niveau du sujet, consultez Configurations au niveau du sujet dans la documentation Apache Kafka.

Créer un sujet

Avant de créer un sujet, examinez les propriétés du sujet.

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    Accéder aux clusters

  2. Cliquez sur le cluster pour lequel vous souhaitez créer un sujet.

    La page Détails du cluster s'ouvre.

  3. Sur la page d'informations du cluster, cliquez sur Créer un sujet.

    La page Créer un sujet Kafka s'ouvre.

  4. Dans le champ Nom du sujet, saisissez une chaîne.

  5. Dans le champ Nombre de partitions, saisissez le nombre de partitions souhaité ou conservez la valeur par défaut.

  6. Dans le champ Facteur de réplication, saisissez le facteur de réplication souhaité ou conservez la valeur par défaut.

  7. (Facultatif) Pour modifier des configurations de sujet, ajoutez-les sous forme de paires clé/valeur séparées par une virgule dans le champ Configurations.

  8. Cliquez sur Créer.

gcloud

  1. Dans la Google Cloud console, activez Cloud Shell.

    Activer Cloud Shell

    En bas de la Google Cloud console, une session Cloud Shell démarre et affiche une invite de ligne de commande. Cloud Shell est un environnement de shell dans lequel Google Cloud CLI est déjà installé, et dans lequel des valeurs sont déjà définies pour votre projet actuel. L'initialisation de la session peut prendre quelques secondes.

  2. Exécutez la gcloud managed-kafka topics create commande :

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Remplacez les éléments suivants :

    • TOPIC_ID : nom du sujet.
    • CLUSTER : nom du cluster dans lequel vous souhaitez créer le sujet.
    • LOCATION : région du cluster.
    • PARTITIONS : nombre de partitions pour le sujet.
    • REPLICATION_FACTOR : facteur de réplication pour le sujet.
    • CONFIGS: paramètres facultatifs au niveau du sujet. Spécifiez-les sous forme de paires clé/valeur séparées par une virgule. Exemple : compression.type=producer.

CLI Kafka

Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

Exécutez la commande kafka-topics.sh comme suit :

kafka-topics.sh --create --if-not-exists \
  --bootstrap-server=BOOTSTRAP_ADDRESS \
  --command-config client.properties \
  --topic TOPIC_ID \
  --partitions PARTITIONS \
  --replication-factor REPLICATION_FACTOR

Remplacez les éléments suivants :

REST

Avant d'utiliser les données de requête ci-dessous, effectuez les remplacements suivants :

  • PROJECT_ID: ID du Google Cloud projet
  • LOCATION : emplacement du cluster
  • CLUSTER_ID : ID du cluster
  • TOPIC_ID : ID du sujet
  • PARTITION_COUNT : nombre de partitions pour le sujet
  • REPLICATION_FACTOR : nombre d'instances répliquées de chaque partition

Méthode HTTP et URL :

POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

Corps JSON de la requête :

{
  "name": "TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Pour envoyer votre requête, développez l'une des options suivantes :

Vous devriez recevoir une réponse JSON de ce type :

{
  "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
  "partitionCount": PARTITION_COUNT,
  "replicationFactor": REPLICATION_FACTOR
}

Terraform

Vous pouvez utiliser une ressource Terraform pour créer un sujet.

resource "google_managed_kafka_topic" "default" {
  project            = data.google_project.default.project_id # Replace this with your project ID in quotes
  topic_id           = "my-topic-id"
  cluster            = google_managed_kafka_cluster.default.cluster_id
  location           = "us-central1"
  partition_count    = 2
  replication_factor = 3
}

Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application(ADC). Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
)

func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// clusterID := "my-cluster"
	// topicID := "my-topic"
	// partitionCount := 10
	// replicationFactor := 3
	// configs := map[string]string{"min.insync.replicas":"1"}
	ctx := context.Background()
	client, err := managedkafka.NewClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
	}
	defer client.Close()

	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
	topicConfig := &managedkafkapb.Topic{
		Name:              topicPath,
		PartitionCount:    partitionCount,
		ReplicationFactor: replicationFactor,
		Configs:           configs,
	}

	req := &managedkafkapb.CreateTopicRequest{
		Parent:  clusterPath,
		TopicId: topicID,
		Topic:   topicConfig,
	}
	topic, err := client.CreateTopic(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %w", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Java.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ClusterName;
import com.google.cloud.managedkafka.v1.CreateTopicRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
import com.google.cloud.managedkafka.v1.Topic;
import com.google.cloud.managedkafka.v1.TopicName;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreateTopic {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String clusterId = "my-cluster";
    String topicId = "my-topic";
    int partitionCount = 100;
    int replicationFactor = 3;
    Map<String, String> configs =
        new HashMap<String, String>() {
          {
            put("min.insync.replicas", "2");
          }
        };
    createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
  }

  public static void createTopic(
      String projectId,
      String region,
      String clusterId,
      String topicId,
      int partitionCount,
      int replicationFactor,
      Map<String, String> configs)
      throws Exception {
    Topic topic =
        Topic.newBuilder()
            .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
            .setPartitionCount(partitionCount)
            .setReplicationFactor(replicationFactor)
            .putAllConfigs(configs)
            .build();
    try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
      CreateTopicRequest request =
          CreateTopicRequest.newBuilder()
              .setParent(ClusterName.of(projectId, region, clusterId).toString())
              .setTopicId(topicId)
              .setTopic(topic)
              .build();
      // This operation is being handled synchronously.
      Topic response = managedKafkaClient.createTopic(request);
      System.out.printf("Created topic: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python de Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

from google.api_core.exceptions import AlreadyExists
from google.cloud import managedkafka_v1

# TODO(developer)
# project_id = "my-project-id"
# region = "us-central1"
# cluster_id = "my-cluster"
# topic_id = "my-topic"
# partition_count = 10
# replication_factor = 3
# configs = {"min.insync.replicas": "1"}

client = managedkafka_v1.ManagedKafkaClient()

topic = managedkafka_v1.Topic()
topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
topic.partition_count = partition_count
topic.replication_factor = replication_factor
# For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
topic.configs = configs

request = managedkafka_v1.CreateTopicRequest(
    parent=client.cluster_path(project_id, region, cluster_id),
    topic_id=topic_id,
    topic=topic,
)

try:
    response = client.create_topic(request=request)
    print("Created topic:", response.name)
except AlreadyExists as e:
    print(f"Failed to create topic {topic.name} with error: {e.message}")

Étape suivante