Créer un connecteur de source Pub/Sub

Les connecteurs sources Pub/Sub diffusent des messages de Pub/Sub vers Kafka. Cela vous permet d'intégrer Pub/Sub à vos applications et pipelines de données basés sur Kafka.

Le connecteur lit les messages d'un abonnement Pub/Sub, convertit chaque message en enregistrement Kafka et écrit les enregistrements dans un sujet Kafka. Par défaut, le connecteur crée des enregistrements Kafka comme suit :

  • La clé d'enregistrement Kafka est null.
  • La valeur de l'enregistrement Kafka correspond aux données du message Pub/Sub en tant qu'octets.
  • Les en-têtes d'enregistrement Kafka sont vides.

Toutefois, vous pouvez configurer ce comportement. Pour en savoir plus, consultez Configurer le connecteur.

Avant de commencer

Avant de créer un connecteur source Pub/Sub, assurez-vous de disposer des éléments suivants :

Rôles et autorisations nécessaires

Pour obtenir les autorisations nécessaires pour créer un connecteur de source Pub/Sub, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur le projet contenant le cluster Connect. Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Ce rôle prédéfini contient les autorisations requises pour créer un connecteur de source Pub/Sub. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :

Autorisations requises

Les autorisations suivantes sont requises pour créer un connecteur source Pub/Sub :

  • Accordez l'autorisation de créer un connecteur sur le cluster Connect parent : managedkafka.connectors.create

Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.

Pour en savoir plus sur le rôle d'éditeur de connecteur Kafka géré, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.

Si votre cluster Managed Service pour Apache Kafka se trouve dans le même projet que le cluster Connect, aucune autre autorisation n'est requise. Si le cluster Connect se trouve dans un autre projet, consultez Créer un cluster Connect dans un autre projet.

Accorder des autorisations pour lire à partir de Pub/Sub

Le compte de service Managed Kafka doit être autorisé à lire les messages de l'abonnement Pub/Sub. Attribuez les rôles IAM suivants au compte de service dans le projet contenant l'abonnement Pub/Sub :

  • Abonné Pub/Sub (roles/pubsub.subscriber)
  • Lecteur Pub/Sub (roles/pubsub.viewer)

Le compte de service Managed Kafka a le format suivant : service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com. Remplacez PROJECT_NUMBER par le numéro du projet.

Créer un connecteur source Pub/Sub

Console

  1. Dans la console Google Cloud , accédez à la page Connecter des clusters.

    Accéder à Connect Clusters

  2. Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.

  3. Cliquez sur Créer un connecteur.

  4. Saisissez une chaîne pour le nom du connecteur.

    Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.

  5. Pour Plug-in de connecteur, sélectionnez Source Pub/Sub.

  6. Dans la liste Abonnement Cloud Pub/Sub, sélectionnez un abonnement Pub/Sub. Le connecteur extrait les messages de cet abonnement. L'abonnement s'affiche sous la forme d'un nom de ressource complet : projects/{project}/subscriptions/{subscription}.

  7. Dans la liste Sujet Kafka, sélectionnez le sujet Kafka dans lequel les messages sont écrits.

  8. Facultatif : Dans la zone Configurations, ajoutez des propriétés de configuration ou modifiez les propriétés par défaut. Pour en savoir plus, consultez Configurer le connecteur.

  9. Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.

  10. Cliquez sur Créer.

gcloud

  1. Exécutez la commande gcloud managed-kafka connectors create :

    gcloud managed-kafka connectors create CONNECTOR_ID \
        --location=LOCATION \
        --connect-cluster=CONNECT_CLUSTER_ID \
        --config-file=CONFIG_FILE
    

    Remplacez les éléments suivants :

    • CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.

    • LOCATION : emplacement du cluster Connect.

    • CONNECT_CLUSTER_ID : ID du cluster Connect où le connecteur est créé.

    • CONFIG_FILE : chemin d'accès à un fichier de configuration YAML ou JSON.

Voici un exemple de fichier de configuration :

connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Clouddans lequel réside l'abonnement Pub/Sub.

  • PUBSUB_SUBSCRIPTION_ID : ID de l'abonnement Pub/Sub à partir duquel extraire les données.

  • KAFKA_TOPIC_ID : ID du sujet Kafka dans lequel les données sont écrites.

Les propriétés de configuration cps.project, cps.subscription et kafka.topic sont obligatoires. Pour découvrir d'autres options de configuration, consultez Configurer le connecteur.

Terraform

Vous pouvez utiliser une ressource Terraform pour créer un connecteur.

resource "google_managed_kafka_connector" "example-pubsub-source-connector" {
  project         = data.google_project.default.project_id
  connector_id    = "my-pubsub-source-connector"
  connect_cluster = google_managed_kafka_connect_cluster.default.connect_cluster_id
  location        = "us-central1"

  configs = {
    "connector.class"  = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
    "name"             = "my-pubsub-source-connector"
    "tasks.max"        = "3"
    "kafka.topic"      = "GMK_TOPIC_ID"
    "cps.subscription" = "CPS_SUBSCRIPTION_ID"
    "cps.project"      = data.google_project.default.project_id
    "value.converter"  = "org.apache.kafka.connect.converters.ByteArrayConverter"
    "key.converter"    = "org.apache.kafka.connect.storage.StringConverter"
  }

  provider = google-beta
}

Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.

Go

Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	managedkafka "cloud.google.com/go/managedkafka/apiv1"
	"cloud.google.com/go/managedkafka/apiv1/managedkafkapb"
	"google.golang.org/api/option"
)

// createPubSubSourceConnector creates a Pub/Sub Source connector.
func createPubSubSourceConnector(w io.Writer, projectID, region, connectClusterID, connectorID, kafkaTopic, cpsSubscription, cpsProject, tasksMax, valueConverter, keyConverter string, opts ...option.ClientOption) error {
	// TODO(developer): Update with your config values. Here is a sample configuration:
	// projectID := "my-project-id"
	// region := "us-central1"
	// connectClusterID := "my-connect-cluster"
	// connectorID := "CPS_SOURCE_CONNECTOR_ID"
	// kafkaTopic := "GMK_TOPIC_ID"
	// cpsSubscription := "CPS_SUBSCRIPTION_ID"
	// cpsProject := "GCP_PROJECT_ID"
	// tasksMax := "3"
	// valueConverter := "org.apache.kafka.connect.converters.ByteArrayConverter"
	// keyConverter := "org.apache.kafka.connect.storage.StringConverter"
	ctx := context.Background()
	client, err := managedkafka.NewManagedKafkaConnectClient(ctx, opts...)
	if err != nil {
		return fmt.Errorf("managedkafka.NewManagedKafkaConnectClient got err: %w", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s/connectClusters/%s", projectID, region, connectClusterID)

	// Pub/Sub Source sample connector configuration
	config := map[string]string{
		"connector.class":  "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
		"name":             connectorID,
		"tasks.max":        tasksMax,
		"kafka.topic":      kafkaTopic,
		"cps.subscription": cpsSubscription,
		"cps.project":      cpsProject,
		"value.converter":  valueConverter,
		"key.converter":    keyConverter,
	}

	connector := &managedkafkapb.Connector{
		Name:    fmt.Sprintf("%s/connectors/%s", parent, connectorID),
		Configs: config,
	}

	req := &managedkafkapb.CreateConnectorRequest{
		Parent:      parent,
		ConnectorId: connectorID,
		Connector:   connector,
	}

	resp, err := client.CreateConnector(ctx, req)
	if err != nil {
		return fmt.Errorf("client.CreateConnector got err: %w", err)
	}
	fmt.Fprintf(w, "Created Pub/Sub source connector: %s\n", resp.Name)
	return nil
}

Java

Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.


import com.google.api.gax.rpc.ApiException;
import com.google.cloud.managedkafka.v1.ConnectClusterName;
import com.google.cloud.managedkafka.v1.Connector;
import com.google.cloud.managedkafka.v1.ConnectorName;
import com.google.cloud.managedkafka.v1.CreateConnectorRequest;
import com.google.cloud.managedkafka.v1.ManagedKafkaConnectClient;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class CreatePubSubSourceConnector {

  public static void main(String[] args) throws Exception {
    // TODO(developer): Replace these variables before running the example.
    String projectId = "my-project-id";
    String region = "my-region"; // e.g. us-east1
    String connectClusterId = "my-connect-cluster";
    String connectorId = "my-pubsub-source-connector";
    String pubsubProjectId = "my-pubsub-project-id";
    String subscriptionName = "my-subscription";
    String kafkaTopicName = "pubsub-topic";
    String connectorClass = "com.google.pubsub.kafka.source.CloudPubSubSourceConnector";
    String maxTasks = "3";
    String valueConverter = "org.apache.kafka.connect.converters.ByteArrayConverter";
    String keyConverter = "org.apache.kafka.connect.storage.StringConverter";
    createPubSubSourceConnector(
        projectId,
        region,
        connectClusterId,
        connectorId,
        pubsubProjectId,
        subscriptionName,
        kafkaTopicName,
        connectorClass,
        maxTasks,
        valueConverter,
        keyConverter);
  }

  public static void createPubSubSourceConnector(
      String projectId,
      String region,
      String connectClusterId,
      String connectorId,
      String pubsubProjectId,
      String subscriptionName,
      String kafkaTopicName,
      String connectorClass,
      String maxTasks,
      String valueConverter,
      String keyConverter)
      throws Exception {

    // Build the connector configuration
    Map<String, String> configMap = new HashMap<>();
    configMap.put("connector.class", connectorClass);
    configMap.put("name", connectorId);
    configMap.put("tasks.max", maxTasks);
    configMap.put("kafka.topic", kafkaTopicName);
    configMap.put("cps.subscription", subscriptionName);
    configMap.put("cps.project", pubsubProjectId);
    configMap.put("value.converter", valueConverter);
    configMap.put("key.converter", keyConverter);

    Connector connector = Connector.newBuilder()
        .setName(
            ConnectorName.of(projectId, region, connectClusterId, connectorId).toString())
        .putAllConfigs(configMap)
        .build();

    try (ManagedKafkaConnectClient managedKafkaConnectClient = ManagedKafkaConnectClient.create()) {
      CreateConnectorRequest request = CreateConnectorRequest.newBuilder()
          .setParent(ConnectClusterName.of(projectId, region, connectClusterId).toString())
          .setConnectorId(connectorId)
          .setConnector(connector)
          .build();

      // This operation is being handled synchronously.
      Connector response = managedKafkaConnectClient.createConnector(request);
      System.out.printf("Created Pub/Sub Source connector: %s\n", response.getName());
    } catch (IOException | ApiException e) {
      System.err.printf("managedKafkaConnectClient.createConnector got err: %s\n", e.getMessage());
    }
  }
}

Python

Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.

Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.

from google.api_core.exceptions import GoogleAPICallError
from google.cloud.managedkafka_v1.services.managed_kafka_connect import (
    ManagedKafkaConnectClient,
)
from google.cloud.managedkafka_v1.types import Connector, CreateConnectorRequest

connect_client = ManagedKafkaConnectClient()
parent = connect_client.connect_cluster_path(project_id, region, connect_cluster_id)

configs = {
    "connector.class": "com.google.pubsub.kafka.source.CloudPubSubSourceConnector",
    "name": connector_id,
    "tasks.max": tasks_max,
    "kafka.topic": kafka_topic,
    "cps.subscription": cps_subscription,
    "cps.project": cps_project,
    "value.converter": value_converter,
    "key.converter": key_converter,
}

connector = Connector()
connector.name = connector_id
connector.configs = configs

request = CreateConnectorRequest(
    parent=parent,
    connector_id=connector_id,
    connector=connector,
)

try:
    operation = connect_client.create_connector(request=request)
    print(f"Waiting for operation {operation.operation.name} to complete...")
    response = operation.result()
    print("Created Connector:", response)
except GoogleAPICallError as e:
    print(f"The operation failed with error: {e}")

Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le suspendre, l'arrêter ou le redémarrer.

Configurer le connecteur

Cette section décrit certaines propriétés de configuration que vous pouvez définir sur le connecteur.

Pour obtenir la liste complète des propriétés spécifiques à ce connecteur, consultez Configurations du connecteur de source Pub/Sub.

Mode Pull

Le mode d'extraction spécifie la manière dont le connecteur récupère les messages Pub/Sub. Les modes suivants sont disponibles :

  • Mode Pull (par défaut) : Les messages sont extraits par lots. Pour activer ce mode, définissez cps.streamingPull.enabled=false.. Pour configurer la taille du lot, définissez la propriété cps.maxBatchSize.

    Pour en savoir plus sur le mode pull, consultez API Pull.

  • Mode Pull de streaming Permet d'obtenir le débit maximal et la latence la plus faible lors de la récupération des messages depuis Pub/Sub. Pour activer ce mode, définissez cps.streamingPull.enabled=true.

    Pour en savoir plus sur le mode pull de streaming, consultez l'API StreamingPull.

    Si l'extraction de flux est activée, vous pouvez ajuster les performances en définissant les propriétés de configuration suivantes :

    • cps.streamingPull.flowControlBytes : nombre maximal d'octets de messages en attente par tâche.
    • cps.streamingPull.flowControlMessages : nombre maximal de messages en attente par tâche.
    • cps.streamingPull.maxAckExtensionMs : durée maximale pendant laquelle le connecteur prolonge le délai d'abonnement, en millisecondes.
    • cps.streamingPull.maxMsPerAckExtension : durée maximale pendant laquelle le connecteur prolonge le délai d'abonnement par prolongation, en millisecondes.
    • cps.streamingPull.parallelStreams : nombre de flux à partir desquels extraire les messages de l'abonnement.

Point de terminaison Pub/Sub

Par défaut, le connecteur utilise le point de terminaison Pub/Sub mondial. Pour spécifier un point de terminaison, définissez la propriété cps.endpoint sur l'adresse du point de terminaison. Pour en savoir plus sur les points de terminaison, consultez Points de terminaison Pub/Sub.

Enregistrements Kafka

Le connecteur source Pub/Sub convertit les messages Pub/Sub en enregistrements Kafka. Les sections suivantes décrivent le processus de conversion.

Clé d'enregistrement

Le convertisseur de clé doit être org.apache.kafka.connect.storage.StringConverter.

  • Par défaut, les clés d'enregistrement sont null.

  • Pour utiliser un attribut de message Pub/Sub comme clé, définissez kafka.key.attribute sur le nom de l'attribut. Exemple : kafka.key.attribute=username.

  • Pour utiliser la clé d'ordonnancement Pub/Sub comme clé, définissez kafka.key.attribute=orderingKey.

En-têtes d'enregistrement

Par défaut, les en-têtes d'enregistrement sont vides.

Si kafka.record.headers est défini sur true, les attributs de message Pub/Sub sont écrits en tant qu'en-têtes d'enregistrement. Pour inclure la clé de tri, définissez cps.makeOrderingKeyAttribute=true.

Valeur de l'enregistrement

Si kafka.record.headers est défini sur true ou si le message Pub/Sub ne comporte aucun attribut personnalisé, la valeur de l'enregistrement correspond aux données du message, sous forme de tableau d'octets. Définissez le convertisseur de valeur sur org.apache.kafka.connect.converters.ByteArrayConverter.

Sinon, si kafka.record.headers est false et que le message comporte au moins un attribut personnalisé, le connecteur écrit la valeur de l'enregistrement sous la forme struct. Définissez le convertisseur de valeur sur org.apache.kafka.connect.json.JsonConverter.

La struct contient les champs suivants :

  • message : données du message Pub/Sub, en octets.

  • Un champ pour chaque attribut de message Pub/Sub. Pour inclure la clé de tri, définissez cps.makeOrderingKeyAttribute=true.

Par exemple, en supposant que le message comporte un attribut username :

{
  "message":"MESSAGE_DATA",
  "username":"Alice"
}

Si value.converter.schemas.enable est défini sur true, struct inclut à la fois la charge utile et le schéma :

{
  "schema":
    {
      "type":"struct",
      "fields": [
        {
          "type":"bytes",
          "optional":false,
          "field":"message"
        },
        {
          "type":"string",
          "optional":false,
          "field":"username"
        }
      ],
      "optional":false
    },
    "payload": {
      "message":"MESSAGE_DATA",
      "username":"Alice"
    }
}

Partitions Kafka

Par défaut, le connecteur écrit dans une seule partition du sujet. Pour spécifier le nombre de partitions dans lesquelles le connecteur écrit, définissez la propriété kafka.partition.count. La valeur ne doit pas dépasser le nombre de partitions du sujet.

Pour spécifier comment le connecteur attribue les messages aux partitions, définissez la propriété kafka.partition.scheme. Pour en savoir plus, consultez Configurations du connecteur de source Pub/Sub.

Étape suivante

Apache Kafka® est une marque déposée d'Apache Software Foundation ou de ses filiales aux États-Unis et/ou dans d'autres pays.