Créer un connecteur de récepteur Pub/Sub

Les connecteurs de récepteur Pub/Sub diffusent les messages des sujets Kafka vers les sujets Pub/Sub. Cela vous permet d'intégrer vos applications basées sur Kafka à Pub/Sub, ce qui facilite les architectures événementielles et le traitement des données en temps réel.

Avant de commencer

Avant de créer un connecteur de récepteur Pub/Sub, assurez-vous de disposer des éléments suivants :

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur de récepteur Pub/Sub, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le projet contenant le cluster Connect :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ces rôles prédéfinis contiennent les autorisations requises pour créer un connecteur de récepteur Pub/Sub. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur de récepteur Pub/Sub :

  • Accordez l'autorisation de créer un connecteur sur le cluster Connect parent : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle Éditeur de connecteurs Kafka gérés, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Si votre cluster Managed Service pour Apache Kafka se trouve dans le même projet que le cluster Connect, aucune autre autorisation n'est requise. Si le cluster Connect se trouve dans un autre projet, consultez Créer un cluster Connect dans un autre projet.

Accorder les autorisations de publication dans le sujet Pub/Sub

Le compte de service du cluster Connect, qui suit le format service-<project_number>@gcp-sa-managedkafka.iam.gserviceaccount.com, doit être autorisé à publier des messages dans le sujet Pub/Sub. Pour ce faire, accordez le rôle Éditeur Pub/Sub (roles/pubsub.publisher) au compte de service du cluster Connect dans le projet contenant le sujet Pub/Sub.

Fonctionnement d'un connecteur de récepteur Pub/Sub

Un connecteur de récepteur Pub/Sub extrait les messages d'un ou de plusieurs sujets Kafka et les publie dans un sujet Pub/Sub.

Voici une explication détaillée de la façon dont le connecteur de récepteur Pub/Sub copie les données :

  • Le connecteur consomme les messages d'un ou de plusieurs sujets Kafka dans le cluster source.

  • Le connecteur écrit les messages dans l'ID de sujet Pub/Sub cible spécifié à l'aide de la propriété de configuration cps.topic. Cette propriété est obligatoire.

  • Le connecteur exige également que le projet Google Cloud contenant le sujet Pub/Sub soit spécifié à l'aide de la propriété de configuration cps.project. Il s'agit d'une propriété obligatoire.

  • Le connecteur peut également utiliser un point de terminaison Pub/Sub personnalisé spécifié à l'aide de la propriété cps.endpoint. Le point de terminaison par défaut est "pubsub.googleapis.com:443".

  • Pour optimiser les performances, le connecteur met les messages en mémoire tampon avant de les publier dans Pub/Sub. Vous pouvez configurer maxBufferSize, maxBufferBytes, maxDelayThresholdMs, maxOutstandingRequestBytes et maxOutstandingMessages pour contrôler la mise en mémoire tampon.

  • Un enregistrement Kafka comporte trois composants : des en-têtes, des clés et des valeurs. Le connecteur utilise des convertisseurs de clés et de valeurs pour transformer les données des messages Kafka au format attendu par Pub/Sub. Lorsque vous utilisez des schémas de valeurs structurées ou de mappage, la propriété messageBodyName spécifie le champ ou la clé à utiliser comme corps du message Pub/Sub.

  • Le connecteur peut inclure le sujet, la partition, le décalage et le code temporel Kafka en tant qu'attributs de message en définissant la propriété metadata.publish sur true.

  • Le connecteur peut inclure des en-têtes de message Kafka en tant qu'attributs de message Pub/Sub en utilisant la propriété headers.publish définie sur true.

  • Le connecteur peut inclure une clé de tri pour les messages Pub/Sub à l'aide de la propriété orderingKeySource. Les options pour sa valeur incluent "none" (par défaut), "key" et "partition".

  • La propriété tasks.max contrôle le niveau de parallélisme du connecteur. L'augmentation de tasks.max peut améliorer le débit, mais le parallélisme réel est limité par le nombre de partitions dans les sujets Kafka.

Propriétés d'un connecteur de récepteur Pub/Sub

Lorsque vous créez un connecteur de récepteur Pub/Sub, vous devez spécifier les propriétés suivantes.

Nom du connecteur

Nom unique du connecteur dans le cluster Connect. Pour obtenir des instructions sur la façon de nommer les ressources, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

Type de plug-in de connecteur

Sélectionnez Récepteur Pub/Sub comme type de plug-in de connecteur. Cela détermine le sens du flux de données, qui va de Kafka vers Pub/Sub, ainsi que l'implémentation spécifique du connecteur utilisé. Si vous n'utilisez pas l'interface utilisateur pour configurer le connecteur, vous devez également spécifier la classe de connecteur.

Sujets Kafka

Sujets Kafka à partir desquels le connecteur consomme des messages. Vous pouvez spécifier un ou plusieurs thèmes, ou utiliser une expression régulière pour faire correspondre plusieurs thèmes. Par exemple, topic.* pour correspondre à tous les sujets commençant par "topic". Ces sujets doivent exister dans le cluster Managed Service pour Apache Kafka associé à votre cluster Connect.

Sujet Pub/Sub

Sujet Pub/Sub existant dans lequel le connecteur publie des messages. Assurez-vous que le compte de service du cluster Connect dispose du rôle roles/pubsub.publisher sur le projet du thème, comme décrit dans Avant de commencer.

Configuration

Cette section vous permet de spécifier des propriétés de configuration supplémentaires spécifiques au connecteur.

Étant donné que les données des thèmes Kafka peuvent être dans différents formats (Avro, JSON ou octets bruts, par exemple), une partie essentielle de la configuration consiste à spécifier des convertisseurs. Les convertisseurs traduisent les données du format utilisé dans vos sujets Kafka au format interne standardisé de Kafka Connect. Le connecteur de récepteur Pub/Sub prend ensuite ces données internes et les transforme au format requis par Pub/Sub avant de les écrire.

Pour obtenir des informations plus générales sur le rôle des convertisseurs dans Kafka Connect, les types de convertisseurs compatibles et les options de configuration courantes, consultez Convertisseurs.

Voici quelques configurations spécifiques au connecteur de récepteur Pub/Sub :

  • cps.project : spécifie l'ID du projet Google Cloud contenant le sujet Pub/Sub.

  • cps.topic : spécifie le sujet Pub/Sub dans lequel les données sont publiées.

  • cps.endpoint : spécifie le point de terminaison Pub/Sub à utiliser.

Pour obtenir la liste des propriétés de configuration disponibles spécifiques à ce connecteur, consultez la section Configurations du connecteur de récepteur Pub/Sub.

Créer un connecteur de récepteur Pub/Sub

Avant de créer un connecteur, consultez la documentation sur les propriétés d'un connecteur de récepteur Pub/Sub.

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect pour lequel vous souhaitez créer le connecteur.

    La page Connecter les détails du cluster s'affiche.

  3. Cliquez sur Créer un connecteur.

    La page Créer un connecteur Kafka s'affiche.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Plug-in de connecteur, sélectionnez Récepteur Pub/Sub.

  6. Sous Sujets, choisissez Sélectionner une liste de sujets Kafka ou Utiliser une expression régulière de sujet. Sélectionnez ou saisissez ensuite le ou les sujets Kafka à partir desquels ce connecteur consomme des messages. Ces sujets se trouvent dans votre cluster Kafka associé.

  7. Dans Sélectionner un sujet Cloud Pub/Sub, choisissez le sujet Pub/Sub sur lequel ce connecteur publie des messages. Le sujet s'affiche au format de nom de ressource complet : projects/{project}/topics/{topic}.

  8. (Facultatif) Configurez d'autres paramètres dans la section Configurations. C'est ici que vous spécifiez des propriétés telles que tasks.max, key.converter et value.converter, comme indiqué dans la section précédente.

  9. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  10. Cliquez sur Créer.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement où vous créez le connecteur. Il doit s'agir du même emplacement que celui où vous avez créé le cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect dans lequel le connecteur est créé.

    • CONFIG_FILE : chemin d'accès au fichier de configuration YAML pour le connecteur BigQuery Sink.

    Voici un exemple de fichier de configuration pour le connecteur Pub/Sub Sink :

    connector.class: "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
    name: "CPS_SINK_CONNECTOR_ID"
    tasks.max: "1"
    topics: "GMK_TOPIC_ID"
    value.converter: "org.apache.kafka.connect.storage.StringConverter"
    key.converter: "org.apache.kafka.connect.storage.StringConverter"
    cps.topic: "CPS_TOPIC_ID"
    cps.project: "GCP_PROJECT_ID"
    

    Remplacez les éléments suivants :

    • CPS_SINK_CONNECTOR_ID : ID ou nom du connecteur Pub/Sub Sink. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • GMK_TOPIC_ID : ID du sujet Managed Service pour Apache Kafka à partir duquel les données sont lues par le connecteur de récepteur Pub/Sub.

    • CPS_TOPIC_ID : ID du sujet Pub/Sub dans lequel les données sont publiées.

    • GCP_PROJECT_ID : ID du projet Google Cloudoù réside votre sujet Pub/Sub.

  3. Terraform

    Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

    resource "google_managed_kafka_connector" "example-pubsub-sink-connector" {
      project         = data.google_project.default.project_id
      connector_id    = "my-pubsub-sink-connector"
      connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
      location        = "us-central1"
    
      configs = {
        "connector.class" = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector"
        "name"            = "my-pubsub-sink-connector"
        "tasks.max"       = "3"
        "topics"          = "TOPIC_NAME"
        "cps.topic"       = "CPS_TOPIC_NAME"
        "cps.project"     = "CPS_PROJECT_NAME"
        "value.converter" = "org.apache.kafka.connect.storage.StringConverter"
        "key.converter"   = "org.apache.kafka.connect.storage.StringConverter"
      }
    
      provider = google-beta
    }

    Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

    Go

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    import (
    	"context"
    	"fmt"
    	"io"
    
    	managedkafka "cloud.google.com/go/managedkafka/apiv1"
    	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
    	"google.golang.org/api/option"
    )
    
    // createPubSubSinkConnector creates a Pub/Sub Sink connector.
    func createPubSubSinkConnector(w io.Writer, projectID, region, connectClusterID, connectorID, topics, valueConverter, keyConverter, cpsTopic, cpsProject, tasksMax string, opts ...option.ClientOption) error {
    	// TODO(developer): Update with your config values. Here is a sample configuration:
    	// projectID := "my-project-id"
    	// region := "us-central1"
    	// connectClusterID := "my-connect-cluster"
    	// connectorID := "CPS_SINK_CONNECTOR_ID"
    	// topics := "GMK_TOPIC_ID"
    	// valueConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
    	// cpsTopic := "CPS_TOPIC_ID"
    	// cpsProject := "GCP_PROJECT_ID"
    	// tasksMax := "3"
    	ctx := context.Background()
    	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
    	if err != nil {
    		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
    	}
    	defer client.Close()
    
    	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)
    
    	// Pub/Sub Sink sample connector configuration
    	config := map[string]string{
    		"connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
    		"name":            connectorID,
    		"tasks.max":       tasksMax,
    		"topics":          topics,
    		"value.converter": valueConverter,
    		"key.converter":   keyConverter,
    		"cps.topic":       cpsTopic,
    		"cps.project":     cpsProject,
    	}
    
    	connector := &managedkafkapb.Connector{
    		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
    		Configs: config,
    	}
    
    	req := &managedkafkapb.CreateConnectorRequest{
    		Parent:      parent,
    		ConnectorId: connectorID,
    		Connector:   connector,
    	}
    
    	resp, err := client.CreateConnector(ctx, req)
    	if err != nil {
    		return fmt.Errorf("client.CreateConnector got err: %w", err)
    	}
    	fmt.Fprintf(w, "Created Pub/Sub sink connector: %s\n", resp.Name)
    	return nil
    }
    

    Java

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    
    import com.google.api.gax.rpc.ApiException;
    import com.google.cloud.managedkafka.v1.ConnectClusterName;
    import com.google.cloud.managedkafka.v1.Connector;
    import com.google.cloud.managedkafka.v1.ConnectorName;
    import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
    import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    
    public class CreatePubSubSinkConnector {
    
      public static void main(String[] args) throws Exception {
        // TODO(developer): Replace these variables before running the example.
        String projectId = "my-project-id";
        String region = "my-region"; // e.g. us-east1
        String connectClusterId = "my-connect-cluster";
        String connectorId = "my-pubsub-sink-connector";
        String pubsubProjectId = "my-pubsub-project-id";
        String pubsubTopicName = "my-pubsub-topic";
        String kafkaTopicName = "kafka-topic";
        String connectorClass = "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector";
        String maxTasks = "3";
        String valueConverter = "org.apache.kafka.connect.storage.StringConverter";
        String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
        createPubSubSinkConnector(
            projectId,
            region,
            connectClusterId,
            connectorId,
            pubsubProjectId,
            pubsubTopicName,
            kafkaTopicName,
            connectorClass,
            maxTasks,
            valueConverter,
            keyConverter);
      }
    
      public static void createPubSubSinkConnector(
          String projectId,
          String region,
          String connectClusterId,
          String connectorId,
          String pubsubProjectId,
          String pubsubTopicName,
          String kafkaTopicName,
          String connectorClass,
          String maxTasks,
          String valueConverter,
          String keyConverter)
          throws Exception {
    
        // Build the connector configuration
        Map<String, String> configMap = new HashMap<>();
        configMap.put("connector.class", connectorClass);
        configMap.put("name", connectorId);
        configMap.put("tasks.max", maxTasks);
        configMap.put("topics", kafkaTopicName);
        configMap.put("value.converter", valueConverter);
        configMap.put("key.converter", keyConverter);
        configMap.put("cps.topic", pubsubTopicName);
        configMap.put("cps.project", pubsubProjectId);
    
        Connector connector = Connector.newBuilder()
            .setName(
                ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
            .putAllConfigs(configMap)
            .build();
    
        try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
          CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
              .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
              .setConnectorId(connectorId)
              .setConnector(connector)
              .build();
    
          // This operation is being handled synchronously.
          Connector response = managedKafkaConnectClient.createConnector(request);
          System.out.printf("Created Pub/Sub Sink connector: %s\n", response.getName());
        } catch (IOException | ApiException e) {
          System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
        }
      }
    }
    

    Python

    Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

    Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

    from google.api_core.exceptions import GoogleAPICallError
    from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
        ManagedKafkaConnectClient,
    )
    from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest
    
    connect_client = ManagedKafkaConnectClient()
    parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)
    
    configs = {
        "connector.class": "com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
        "name": connector_id,
        "tasks.max": tasks_max,
        "topics": topics,
        "value.converter": value_converter,
        "key.converter": key_converter,
        "cps.topic": cps_topic,
        "cps.project": cps_project,
    }
    
    connector = Connector()
    connector.name = connector_id
    connector.configs = configs
    
    request = CreateConnectorRequest(
        parent=parent,
        connector_id=connector_id,
        connector=connector,
    )
    
    try:
        operation = connect_client.create_connector(request=request)
        print(f"Waiting for operation {operation.operation.name} to complete...")
        response = operation.result()
        print("Created Connector:", response)
    except GoogleAPICallError as e:
        print(f"The operation failed with error: {e}")

Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le suspendre, l'arrêter ou le redémarrer.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.