Créer un sujet Google Cloud Managed Service pour Apache Kafka

Dans Managed Service pour Apache Kafka, les messages sont organisés en sujets. Un sujet est constitué de partitions. Une partition est une séquence de données immuable et ordonnée qui appartient à un seul courtier dans un cluster Kafka. Vous devez créer un sujet pour publier ou consommer des messages.

Pour créer un sujet, vous pouvez utiliser la console Google Cloud , la Google Cloud CLI, la bibliothèque cliente, l'API Managed Kafka ou les API Apache Kafka Open Source.

Avant de commencer

Vous devez d'abord créer un cluster avant de créer un sujet. Assurez-vous d'avoir configuré les éléments suivants :

Rôles et autorisations requis pour créer un sujet

Pour obtenir les autorisations nécessaires pour créer un sujet, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de sujets Kafka gérés (roles/managedkafka.topicEditor) sur votre projet. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un sujet. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un sujet :

  • Créez un sujet : managedkafka.topics.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Le rôle "Éditeur de sujet Managed Kafka" contient également le rôle "Lecteur Managed Kafka". Pour en savoir plus sur ce rôle, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Propriétés d'un sujet Managed Service pour Apache Kafka

Lorsque vous créez ou mettez à jour un sujet Managed Service pour Apache Kafka, vous devez spécifier les propriétés suivantes.

Nom du sujet

Nom du sujet Managed Service pour Apache Kafka que vous créez. Pour obtenir des instructions sur la façon de nommer un sujet, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un thème est immuable.

Nombre de partitions

Nombre de partitions dans le sujet. Vous pouvez modifier un sujet pour augmenter le nombre de partitions, mais pas le diminuer. L'augmentation du nombre de partitions pour un sujet qui utilise une clé peut modifier la façon dont les messages sont distribués.

Facteur de réplication

Nombre de réplicas pour chaque partition. Si vous ne spécifiez pas de valeur, le facteur de réplication par défaut du cluster est utilisé.

Un facteur de réplication plus élevé peut améliorer la cohérence des données en cas de défaillance d'un courtier, car les données sont répliquées sur plusieurs courtiers. Pour les environnements de production, nous vous recommandons d'utiliser un facteur de réplication de 3 ou plus. Plus le nombre de répliques est élevé, plus les coûts de stockage local et de transfert de données pour le sujet sont élevés. Toutefois, ils n'augmentent pas les coûts de stockage persistant. Le facteur de réplication ne peut pas dépasser le nombre de courtiers disponibles.

Autres paramètres

Vous pouvez également définir d'autres paramètres de configuration Apache Kafka au niveau du sujet. Elles sont spécifiées sous forme de paires key=value qui remplacent les valeurs par défaut du cluster.

Les configurations liées aux thèmes ont une valeur par défaut au niveau du serveur et un remplacement facultatif par thème. Le format est une liste de paires KEY=VALUE séparées par une virgule, où KEY est le nom de la propriété de configuration du sujet Kafka et VALUE le paramètre requis.Ces paires clé-valeur vous aident à remplacer les valeurs par défaut du cluster. Par exemple, flush.ms=10 et compression.type=producer ne sont pas autorisés.

Pour obtenir la liste de toutes les configurations compatibles au niveau du sujet, consultez Configurations au niveau du sujet dans la documentation Apache Kafka.

Créer un sujet

Avant de créer un thème, consultez les propriétés des thèmes.

Console

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

  2. Cliquez sur le cluster pour lequel vous souhaitez créer un sujet.

    La page Détails du cluster s'ouvre.

  3. Sur la page "Détails du cluster", cliquez sur Créer un sujet.

    La page Créer un sujet Kafka s'ouvre.

  4. Dans le champ Nom du thème, saisissez une chaîne.

  5. Pour Nombre de partitions, saisissez le nombre de partitions souhaité ou conservez la valeur par défaut.

  6. Pour Facteur de réplication, saisissez le facteur de réplication souhaité ou conservez la valeur par défaut.

  7. (Facultatif) Pour modifier les configurations d'un sujet, ajoutez-les sous forme de paires clé/valeur séparées par une virgule dans le champ Configurations.

  8. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka topics create :

    gcloud managed-kafka topics create TOPIC_ID \
        --cluster=CLUSTER_ID --location=LOCATION_ID \
        --partitions=PARTITIONS \
        --replication-factor=REPLICATION_FACTOR \
        --configs=CONFIGS
    

    Remplacez les éléments suivants :

    • TOPIC_ID : nom du thème.
    • CLUSTER : nom du cluster dans lequel vous souhaitez créer le sujet.
    • LOCATION : région du cluster.
    • PARTITIONS : nombre de partitions pour le sujet.
    • REPLICATION_FACTOR : facteur de réplication du thème.
    • CONFIGS : paramètres facultatifs au niveau du thème. Spécifiez-les sous forme de paires clé/valeur séparées par une virgule. Exemple : compression.type=producer.
  3. CLI Kafka

    Avant d'exécuter cette commande, installez les outils de ligne de commande Kafka sur une VM Compute Engine. La VM doit pouvoir accéder à un sous-réseau connecté à votre cluster Managed Service pour Apache Kafka. Suivez les instructions de la section Produire et consommer des messages avec les outils de ligne de commande Kafka.

    Exécutez la commande kafka-topics.sh comme suit :

    kafka-topics.sh --create --if-not-exists \
      --bootstrap-server=BOOTSTRAP_ADDRESS \
      --command-config client.properties \
      --topic TOPIC_ID \
      --partitions PARTITIONS \
      --replication-factor REPLICATION_FACTOR
    

    Remplacez les éléments suivants :

    • BOOTSTRAP_ADDRESS : adresse d'amorçage du cluster Managed Service pour Apache Kafka.

    • TOPIC_ID : nom du thème.

    • PARTITIONS : nombre de partitions pour le sujet.

    • REPLICATION_FACTOR : facteur de réplication du sujet.

    REST

    Avant d'utiliser les données de requête, effectuez les remplacements suivants :

    • PROJECT_ID : ID de votre projet Google Cloud
    • LOCATION : emplacement du cluster
    • CLUSTER_ID : ID du cluster
    • TOPIC_ID : ID du sujet
    • PARTITION_COUNT : nombre de partitions pour le sujet
    • REPLICATION_FACTOR : nombre d'instances répliquées de chaque partition

    Méthode HTTP et URL :

    POST https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics?topicId=TOPIC_ID

    Corps JSON de la requête :

    {
      "name": "TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Pour envoyer votre requête, développez l'une des options suivantes :

    Vous devriez recevoir une réponse JSON de ce type :

    {
      "name": "projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC_ID",
      "partitionCount": PARTITION_COUNT,
      "replicationFactor": REPLICATION_FACTOR
    }
    

    Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un sujet.

    resource "google_managed_kafka_topic" "default" {
      project            = data.google_project.default.project_id # Replace this with your project ID in quotes
      topic_id           = "my-topic-id"
      cluster            = google_managed_kafka_cluster.default.cluster_id
      location           = "us-central1"
      partition_count    = 2
      replication_factor = 3
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    )
    
    func createTopic(w io.Writer, projectID, region, clusterID, topicID string, partitionCount, replicationFactor int32, configs map[string]string, opts ...option.ClientOption) error {
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// clusterID := "my-cluster"
    	// topicID := "my-topic"
    	// partitionCount := 10
    	// replicationFactor := 3
    	// configs := map[string]string{"min.insync.replicas":"1"}
    	ctx := context.Background()
    	client, err := managedkafka.NewClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewClient got err: %w", err)
    	}
    	defer client.Close()
    
    	clusterPath := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", projectID, region, clusterID)
    	topicPath := fmt.Sprintf("%s/topics/%s", clusterPath, topicID)
    	topicConfig := &managedkafkapb.Topic{
    		Name:              topicPath,
    		PartitionCount:    partitionCount,
    		ReplicationFactor: replicationFactor,
    		Configs:           configs,
    	}
    
    	req := &managedkafkapb.CreateTopicRequest{
    		Parent:  clusterPath,
    		TopicId: topicID,
    		Topic:   topicConfig,
    	}
    	topic, err := client.CreateTopic(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateTopic got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ClusterName;
    import com.google.cloud.managedkafka.v1.CreateTopicRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaClient;
    import com.google.cloud.managedkafka.v1.Topic;
    import com.google.cloud.managedkafka.v1.TopicName;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreateTopic {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String clusterId = "my-cluster";
        String topicId = "my-topic";
        int partitionCount = 100;
        int replicationFactor = 3;
        Map<String, String> configs =
            new HashMap<String, String>() {
              {
                put("min.insync.replicas", "2");
              }
            };
        createTopic(projectId, region, clusterId, topicId, partitionCount, replicationFactor, configs);
      }
    
      public static void createTopic(
          String projectId,
          String region,
          String clusterId,
          String topicId,
          int partitionCount,
          int replicationFactor,
          Map<String, String> configs)
          throws Exception {
        Topic topic =
            Topic.newBuilder()
                .setName(TopicName.of(projectId, region, clusterId, topicId).toString())
                .setPartitionCount(partitionCount)
                .setReplicationFactor(replicationFactor)
                .putAllConfigs(configs)
                .build();
        try (ManagedKafkaClient managedKafkaClient = ManagedKafkaClient.create()) {
          CreateTopicRequest request =
              CreateTopicRequest.newBuilder()
                  .setParent(ClusterName.of(projectId, region, clusterId).toString())
                  .setTopicId(topicId)
                  .setTopic(topic)
                  .build();
          // This operation is being handled synchronously.
          Topic response = managedKafkaClient.createTopic(request);
          System.out.printf("Created topic: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaClient.createTopic got err: %s", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import AlreadyExists
    from google.cloud import managedkafka_v1
    
    # TODO(developer)
    # project_id = "my-project-id"
    # region = "us-central1"
    # cluster_id = "my-cluster"
    # topic_id = "my-topic"
    # partition_count = 10
    # replication_factor = 3
    # configs = {"min.insync.replicas": "1"}
    
    client = managedkafka_v1.ManagedKafkaClient()
    
    topic = managedkafka_v1.Topic()
    topic.name = client.topic_path(project_id, region, cluster_id, topic_id)
    topic.partition_count = partition_count
    topic.replication_factor = replication_factor
    # For a list of configs, one can check https://kafka.apache.org/documentation/#topicconfigs
    topic.configs = configs
    
    request = managedkafka_v1.CreateTopicRequest(
        parent=client.cluster_path(project_id, region, cluster_id),
        topic_id=topic_id,
        topic=topic,
    )
    
    try:
        response = client.create_topic(request=request)
        print("Created topic:", response.name)
    except AlreadyExists as e:
        print(f"Failed to create topic {topic.name} with error: {e.message}")
    

Étape suivante