Les connecteurs sources Pub/Sub diffusent des messages de Pub/Sub vers Kafka. Cela vous permet d'intégrer Pub/Sub à vos applications et pipelines de données basés sur Kafka.
Le connecteur lit les messages d'un abonnement Pub/Sub, convertit chaque message en enregistrement Kafka et écrit les enregistrements dans un sujet Kafka. Par défaut, le connecteur crée des enregistrements Kafka comme suit :
- La clé d'enregistrement Kafka est
null. - La valeur de l'enregistrement Kafka correspond aux données du message Pub/Sub en tant qu'octets.
- Les en-têtes d'enregistrement Kafka sont vides.
Toutefois, vous pouvez configurer ce comportement. Pour en savoir plus, consultez Configurer le connecteur.
Avant de commencer
Avant de créer un connecteur source Pub/Sub, assurez-vous de disposer des éléments suivants :
Un sujet Pub/Sub avec un abonnement.
Un sujet Kafka dans le cluster Kafka.
Un cluster de connexion. Lorsque vous créez le cluster Connect, définissez le cluster Managed Service pour Apache Kafka comme cluster Kafka principal.
Rôles et autorisations nécessaires
Pour obtenir les autorisations nécessaires pour créer un connecteur de source Pub/Sub, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Kafka géré (roles/managedkafka.connectorEditor) sur le projet contenant le cluster Connect.
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Ce rôle prédéfini contient les autorisations requises pour créer un connecteur de source Pub/Sub. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :
Autorisations requises
Les autorisations suivantes sont requises pour créer un connecteur source Pub/Sub :
-
Accordez l'autorisation de créer un connecteur sur le cluster Connect parent :
managedkafka.connectors.create
Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.
Pour en savoir plus sur le rôle d'éditeur de connecteur Kafka géré, consultez Rôles prédéfinis de Managed Service pour Apache Kafka.
Si votre cluster Managed Service pour Apache Kafka se trouve dans le même projet que le cluster Connect, aucune autre autorisation n'est requise. Si le cluster Connect se trouve dans un autre projet, consultez Créer un cluster Connect dans un autre projet.
Accorder des autorisations pour lire à partir de Pub/Sub
Le compte de service Managed Kafka doit être autorisé à lire les messages de l'abonnement Pub/Sub. Attribuez les rôles IAM suivants au compte de service dans le projet contenant l'abonnement Pub/Sub :
- Abonné Pub/Sub (
roles/pubsub.subscriber) - Lecteur Pub/Sub (
roles/pubsub.viewer)
Le compte de service Managed Kafka a le format suivant : service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com.
Remplacez PROJECT_NUMBER par le numéro du projet.
Créer un connecteur source Pub/Sub
Console
Dans la console Google Cloud , accédez à la page Connecter des clusters.
Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.
Cliquez sur Créer un connecteur.
Saisissez une chaîne pour le nom du connecteur.
Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka.
Pour Plug-in de connecteur, sélectionnez Source Pub/Sub.
Dans la liste Abonnement Cloud Pub/Sub, sélectionnez un abonnement Pub/Sub. Le connecteur extrait les messages de cet abonnement. L'abonnement s'affiche sous la forme d'un nom de ressource complet :
projects/{project}/subscriptions/{subscription}.Dans la liste Sujet Kafka, sélectionnez le sujet Kafka dans lequel les messages sont écrits.
Facultatif : Dans la zone Configurations, ajoutez des propriétés de configuration ou modifiez les propriétés par défaut. Pour en savoir plus, consultez Configurer le connecteur.
Sélectionnez la règle de redémarrage des tâches. Pour en savoir plus, consultez la section Règles de redémarrage des tâches.
Cliquez sur Créer.
gcloud
Exécutez la commande
gcloud managed-kafka connectors create:gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILERemplacez les éléments suivants :
CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez les consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.
LOCATION : emplacement du cluster Connect.
CONNECT_CLUSTER_ID : ID du cluster Connect où le connecteur est créé.
CONFIG_FILE : chemin d'accès à un fichier de configuration YAML ou JSON.
Voici un exemple de fichier de configuration :
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Remplacez les éléments suivants :
PROJECT_ID : ID du projet Google Clouddans lequel réside l'abonnement Pub/Sub.
PUBSUB_SUBSCRIPTION_ID : ID de l'abonnement Pub/Sub à partir duquel extraire les données.
KAFKA_TOPIC_ID : ID du sujet Kafka dans lequel les données sont écrites.
Les propriétés de configuration cps.project, cps.subscription et kafka.topic sont obligatoires. Pour découvrir d'autres options de configuration, consultez Configurer le connecteur.
Terraform
Vous pouvez utiliser une ressource Terraform pour créer un connecteur.
Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.
Go
Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Java
Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Java pour Managed Service pour Apache Kafka.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Python
Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python Managed Service pour Apache Kafka.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le suspendre, l'arrêter ou le redémarrer.
Configurer le connecteur
Cette section décrit certaines propriétés de configuration que vous pouvez définir sur le connecteur.
Pour obtenir la liste complète des propriétés spécifiques à ce connecteur, consultez Configurations du connecteur de source Pub/Sub.
Mode Pull
Le mode d'extraction spécifie la manière dont le connecteur récupère les messages Pub/Sub. Les modes suivants sont disponibles :
Mode Pull (par défaut) : Les messages sont extraits par lots. Pour activer ce mode, définissez
cps.streamingPull.enabled=false.. Pour configurer la taille du lot, définissez la propriétécps.maxBatchSize.Pour en savoir plus sur le mode pull, consultez API Pull.
Mode Pull de streaming Permet d'obtenir le débit maximal et la latence la plus faible lors de la récupération des messages depuis Pub/Sub. Pour activer ce mode, définissez
cps.streamingPull.enabled=true.Pour en savoir plus sur le mode pull de streaming, consultez l'API StreamingPull.
Si l'extraction de flux est activée, vous pouvez ajuster les performances en définissant les propriétés de configuration suivantes :
cps.streamingPull.flowControlBytes: nombre maximal d'octets de messages en attente par tâche.cps.streamingPull.flowControlMessages: nombre maximal de messages en attente par tâche.cps.streamingPull.maxAckExtensionMs: durée maximale pendant laquelle le connecteur prolonge le délai d'abonnement, en millisecondes.cps.streamingPull.maxMsPerAckExtension: durée maximale pendant laquelle le connecteur prolonge le délai d'abonnement par prolongation, en millisecondes.cps.streamingPull.parallelStreams: nombre de flux à partir desquels extraire les messages de l'abonnement.
Point de terminaison Pub/Sub
Par défaut, le connecteur utilise le point de terminaison Pub/Sub mondial. Pour spécifier un point de terminaison, définissez la propriété cps.endpoint sur l'adresse du point de terminaison.
Pour en savoir plus sur les points de terminaison, consultez Points de terminaison Pub/Sub.
Enregistrements Kafka
Le connecteur source Pub/Sub convertit les messages Pub/Sub en enregistrements Kafka. Les sections suivantes décrivent le processus de conversion.
Clé d'enregistrement
Le convertisseur de clé doit être org.apache.kafka.connect.storage.StringConverter.
Par défaut, les clés d'enregistrement sont
null.Pour utiliser un attribut de message Pub/Sub comme clé, définissez
kafka.key.attributesur le nom de l'attribut. Exemple :kafka.key.attribute=username.Pour utiliser la clé d'ordonnancement Pub/Sub comme clé, définissez
kafka.key.attribute=orderingKey.
En-têtes d'enregistrement
Par défaut, les en-têtes d'enregistrement sont vides.
Si kafka.record.headers est défini sur true, les attributs de message Pub/Sub sont écrits en tant qu'en-têtes d'enregistrement. Pour inclure la clé de tri, définissez cps.makeOrderingKeyAttribute=true.
Valeur de l'enregistrement
Si kafka.record.headers est défini sur true ou si le message Pub/Sub ne comporte aucun attribut personnalisé, la valeur de l'enregistrement correspond aux données du message, sous forme de tableau d'octets.
Définissez le convertisseur de valeur sur org.apache.kafka.connect.converters.ByteArrayConverter.
Sinon, si kafka.record.headers est false et que le message comporte au moins un attribut personnalisé, le connecteur écrit la valeur de l'enregistrement sous la forme struct. Définissez le convertisseur de valeur sur org.apache.kafka.connect.json.JsonConverter.
La struct contient les champs suivants :
message: données du message Pub/Sub, en octets.Un champ pour chaque attribut de message Pub/Sub. Pour inclure la clé de tri, définissez
cps.makeOrderingKeyAttribute=true.
Par exemple, en supposant que le message comporte un attribut username :
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Si value.converter.schemas.enable est défini sur true, struct inclut à la fois la charge utile et le schéma :
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}
Partitions Kafka
Par défaut, le connecteur écrit dans une seule partition du sujet. Pour spécifier le nombre de partitions dans lesquelles le connecteur écrit, définissez la propriété kafka.partition.count. La valeur ne doit pas dépasser le nombre de partitions du sujet.
Pour spécifier comment le connecteur attribue les messages aux partitions, définissez la propriété kafka.partition.scheme. Pour en savoir plus, consultez Configurations du connecteur de source Pub/Sub.