Les connecteurs source Pub/Sub diffusent des messages de Pub/Sub vers Kafka, ce qui vous permet d'intégrer Pub/Sub à vos applications et pipelines de données basés sur Kafka.
Voici des exemples de cas d'utilisation des connecteurs source Pub/Sub :
Ingestion de données en temps réel. Publiez des données à partir de services cloud ou d'autres applications dans Pub/Sub, puis répliquez-les dans Kafka pour le traitement par flux.
Architectures basées sur des événements. Déclenchez le traitement basé sur Kafka à partir des messages publiés dans Pub/Sub.
Le connecteur lit les messages d'un abonnement Pub/Sub, convertit chaque message en enregistrement Kafka et écrit les enregistrements dans un sujet Kafka. Par défaut, le connecteur crée des enregistrements Kafka comme suit :
- La clé d'enregistrement Kafka est
null. - La valeur d'enregistrement Kafka correspond aux données du message Pub/Sub en tant qu'octets.
- Les en-têtes d'enregistrement Kafka sont vides.
Toutefois, vous pouvez configurer ce comportement. Pour en savoir plus, consultez Configurer le connecteur.
Avant de commencer
Avant de créer un connecteur source Pub/Sub, assurez-vous de disposer des éléments suivants :
Un sujet Pub/Sub avec un abonnement.
Un sujet Kafka dans le cluster Kafka.
Un cluster Connect. Lorsque vous créez le cluster Connect, définissez le cluster Managed Service pour Apache Kafka comme cluster Kafka principal.
Rôles et autorisations requis
Pour obtenir les autorisations nécessaires pour créer un connecteur, demandez à votre administrateur de vous accorder le rôle IAM Éditeur de connecteur Managed Kafka (roles/managedkafka.connectorEditor) sur votre projet.
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Ce rôle prédéfini contient les autorisations requises pour créer un connecteur. Pour connaître les autorisations exactes requises, développez la section Autorisations requises :
Autorisations requises
Les autorisations suivantes sont requises pour créer un connecteur :
-
Créer un connecteur :
managedkafka.connectors.create
Vous pouvez également obtenir ces autorisations avec des rôles personnalisés ou d'autres rôles prédéfinis.
Accorder des autorisations pour lire à partir de Pub/Sub
Le compte de service Managed Kafka doit être autorisé à lire les messages de l'abonnement Pub/Sub. Accordez les rôles IAM suivants au compte de service sur le projet contenant l'abonnement Pub/Sub :
- Abonné Pub/Sub (
roles/pubsub.subscriber) - Lecteur Pub/Sub (
roles/pubsub.viewer)
Le compte de service Managed Kafka a le format suivant :
service-PROJECT_NUMBER@gcp-sa-managedkafka.iam.gserviceaccount.com,
où PROJECT_NUMBER correspond au numéro de projet du cluster Connect.
Si votre cluster Connect se trouve dans un projet différent de votre cluster Managed Service pour Apache Kafka cluster, consultez Créer un cluster Connect dans un autre projet.
Créer un connecteur source Pub/Sub
Console
Dans la Google Cloud console, accédez à la page Connect Clusters.
Cliquez sur le cluster Connect dans lequel vous souhaitez créer le connecteur.
Cliquez sur Create connector (Créer un connecteur).
Saisissez une chaîne pour le nom du connecteur.
Pour obtenir des instructions sur la façon de nommer un connecteur, consultez Consignes de dénomination des ressources Managed Service pour Apache Kafka.
Pour Connector plugin (Plug-in de connecteur), sélectionnez Pub/Sub Source (Source Pub/Sub).
Dans la liste Cloud Pub/Sub subscription (Abonnement Cloud Pub/Sub), sélectionnez un abonnement Pub/Sub. Le connecteur extrait les messages de cet abonnement. L'abonnement s'affiche sous la forme d'un nom complet de ressource :
projects/{project}/subscriptions/{subscription}.Dans la liste Kafka topic (Sujet Kafka), sélectionnez le sujet Kafka dans lequel les messages sont écrits.
(Facultatif) Dans la zone Configurations, ajoutez des propriétés de configuration ou modifiez les propriétés par défaut. Pour en savoir plus, consultez Configurer le connecteur.
Sélectionnez la Task restart policy (Règle de redémarrage des tâches). Pour en savoir plus, consultez Règle de redémarrage des tâches.
Cliquez sur Create (Créer).
gcloud
Exécutez la
gcloud managed-kafka connectors createcommande :gcloud managed-kafka connectors create CONNECTOR_ID \ --location=LOCATION \ --connect-cluster=CONNECT_CLUSTER_ID \ --config-file=CONFIG_FILERemplacez les éléments suivants :
CONNECTOR_ID : ID ou nom du connecteur. Pour obtenir des instructions sur la façon de nommer un connecteur, consultez Consignes de dénomination des ressources Managed Service pour Apache Kafka. Le nom d'un connecteur est immuable.
LOCATION : emplacement du cluster Connect.
CONNECT_CLUSTER_ID : ID du cluster Connect dans lequel le connecteur est créé.
CONFIG_FILE : chemin d'accès à un fichier de configuration YAML ou JSON.
Voici un exemple de fichier de configuration :
connector.class: "com.google.pubsub.kafka.source.CloudPubSubSourceConnector"
cps.project: "PROJECT_ID"
cps.subscription: "PUBSUB_SUBSCRIPTION_ID"
kafka.topic: "KAFKA_TOPIC_ID"
value.converter: "org.apache.kafka.connect.converters.ByteArrayConverter"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
tasks.max: "3"
Remplacez les éléments suivants :
PROJECT_ID : ID du projet dans lequel réside l'abonnement Pub/Sub. Google Cloud
PUBSUB_SUBSCRIPTION_ID : ID de l' abonnement Pub/Sub à partir duquel extraire les données.
KAFKA_TOPIC_ID : ID du sujet Kafka dans lequel les données sont écrites.
Les propriétés de configuration cps.project, cps.subscription et kafka.topic sont obligatoires. Pour obtenir des options de configuration supplémentaires, consultez
Configurer le connecteur.
Terraform
Vous pouvez utiliser une ressource Terraform pour créer un connecteur.
Pour savoir comment appliquer ou supprimer une configuration Terraform, consultez Commandes Terraform de base.
Go
Avant d'essayer cet exemple, suivez les instructions de configuration pour Go dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Go.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application(ADC). Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Java
Avant d'essayer cet exemple, suivez les instructions de configuration pour Java dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Managed Service pour Apache Kafka en langage Java.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Python
Avant d'essayer cet exemple, suivez les instructions de configuration pour Python dans Installer les bibliothèques clientes. Pour en savoir plus, consultez la documentation de référence de l'API Python de Managed Service pour Apache Kafka.
Pour vous authentifier auprès de Managed Service pour Apache Kafka, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer les ADC pour un environnement de développement local.
Une fois que vous avez créé un connecteur, vous pouvez le modifier, le supprimer, le mettre en veille, l'arrêter ou le redémarrer.
Configurer le connecteur
Cette section décrit certaines propriétés de configuration que vous pouvez définir sur le connecteur.
Pour obtenir la liste complète des propriétés spécifiques à ce connecteur, consultez Configurations du connecteur source Pub/Sub.
Mode pull
Le mode pull spécifie comment le connecteur récupère les messages Pub/Sub. Les modes suivants sont compatibles :
Mode pull (par défaut) Les messages sont extraits par lot. Pour activer ce mode, définissez
cps.streamingPull.enabled=false.Pour configurer la taille de lot, définissez la propriétécps.maxBatchSize.Pour en savoir plus sur le mode pull, consultez l'API Pull.
Mode Streaming Pull Permet d'obtenir le débit maximal et la latence la plus faible lors de la récupération des messages à partir de Pub/Sub. Pour activer ce mode, définissez
cps.streamingPull.enabled=true.Pour en savoir plus sur le mode Streaming Pull, consultez l'API StreamingPull.
Si le mode Streaming Pull est activé, vous pouvez régler les performances en définissant les propriétés de configuration suivantes :
cps.streamingPull.flowControlBytes: nombre maximal d'octets de messages en attente par tâche.cps.streamingPull.flowControlMessages: nombre maximal de messages en attente par tâche.cps.streamingPull.maxAckExtensionMs: délai maximal pendant lequel le connecteur prolonge le délai d'abonnement, en millisecondes.cps.streamingPull.maxMsPerAckExtension: délai maximal pendant lequel le connecteur prolonge le délai d'abonnement par extension, en millisecondes.cps.streamingPull.parallelStreams: nombre de flux à partir desquels extraire les messages de l'abonnement.
Point de terminaison Pub/Sub
Par défaut, le connecteur utilise le point de terminaison Pub/Sub mondial. Pour spécifier un point de terminaison, définissez la propriété cps.endpoint sur l'adresse du point de terminaison.
Pour en savoir plus sur les points de terminaison, consultez
Points de terminaison Pub/Sub.
Partitions Kafka
Par défaut, le connecteur écrit dans une seule partition du sujet. Pour spécifier le nombre de partitions dans lesquelles le connecteur écrit, définissez la propriété kafka.partition.count. La valeur ne doit pas dépasser le nombre de partitions du sujet
.
Pour spécifier comment le connecteur attribue des messages aux partitions, définissez la propriété kafka.partition.scheme. Pour en savoir plus, consultez
Configurations du connecteur source Pub/Sub.
Visiteurs ayant déjà réalisé une conversion
Définissez le convertisseur de clé sur org.apache.kafka.connect.storage.StringConverter.
En fonction de la configuration du connecteur, définissez le convertisseur de valeur sur l'une des valeurs suivantes :
org.apache.kafka.connect.converters.ByteArrayConverterorg.apache.kafka.connect.json.JsonConverter
Pour en savoir plus, consultez Valeur d'enregistrement.
Conversion par SMS
Le connecteur source Pub/Sub convertit les messages Pub/Sub en enregistrements Kafka. Les sections suivantes décrivent le processus de conversion.
Clé d'enregistrement
Le convertisseur de clé doit être org.apache.kafka.connect.storage.StringConverter.
Par défaut, les clés d'enregistrement sont
null.Pour utiliser un attribut de message Pub/Sub comme clé, définissez
kafka.key.attributesur le nom de l'attribut. Par exemple,kafka.key.attribute=username.Pour utiliser la clé de tri Pub/Sub comme clé, définissez
kafka.key.attribute=orderingKey.
En-têtes d'enregistrement
Par défaut, les en-têtes d'enregistrement sont vides.
Si kafka.record.headers est true, les attributs de message Pub/Sub sont écrits en tant qu'en-têtes d'enregistrement. Pour inclure la clé de tri, définissez cps.makeOrderingKeyAttribute=true.
Valeur d'enregistrement
Les valeurs d'enregistrement sont écrites sous forme de tableaux d'octets ou de types struct.
Valeurs d'enregistrement de tableau d'octets
Si kafka.record.headers est true ou si le message Pub/Sub ne comporte aucun attribut personnalisé, le connecteur écrit les données du message sous forme de tableau d'octets.
Définissez le convertisseur de valeur sur org.apache.kafka.connect.converters.ByteArrayConverter.
Valeurs d'enregistrement de structure
Si kafka.record.headers est false et que le message comporte au moins un attribut personnalisé, le connecteur écrit la valeur d'enregistrement sous forme de struct. Définissez le convertisseur de valeur sur org.apache.kafka.connect.json.JsonConverter.
La struct contient les champs suivants :
message: données du message Pub/Sub, en tant qu'octets.Un champ pour chaque attribut de message Pub/Sub. Pour inclure la clé de tri, définissez
cps.makeOrderingKeyAttribute=true.
Par exemple, si le message comporte un attribut username, la valeur d'enregistrement se présente comme suit :
{
"message":"MESSAGE_DATA",
"username":"Alice"
}
Si value.converter.schemas.enable est true, la struct inclut à la fois la charge utile et le schéma :
{
"schema":
{
"type":"struct",
"fields": [
{
"type":"bytes",
"optional":false,
"field":"message"
},
{
"type":"string",
"optional":false,
"field":"username"
}
],
"optional":false
},
"payload": {
"message":"MESSAGE_DATA",
"username":"Alice"
}
}