Le tableau suivant répertorie les types de connecteurs Kafka Connect compatibles avec Managed Service pour Apache Kafka. Vous pouvez utiliser ces connecteurs pour intégrer Apache Kafka à vos applications et à d'autres Google Cloud services.
| Connecteur | Description | Cas d'utilisation |
|---|---|---|
| MirrorMaker 2.0 | Réplique les sujets et les données d'un cluster Kafka vers un autre cluster Kafka cluster. | Réplication des données, reprise après sinistre, migration des données |
| Récepteur BigQuery | Diffuse les données des sujets Kafka vers une table BigQuery. | Entreposage de données, analyses |
| Récepteur Cloud Storage | Diffuse les données des sujets Kafka vers un bucket Cloud Storage. | Ingestion de lacs de données, archivage de données |
| Récepteur Pub/Sub | Diffuse les données des sujets Kafka vers un sujet Pub/Sub. | Intégration de services, notifications en temps réel |
| Source Pub/Sub | Diffuse les messages d'un abonnement Pub/Sub vers un sujet Kafka topic. | Ingestion de données en temps réel, architectures basées sur des événements |
Visiteurs ayant déjà réalisé une conversion
Les visiteurs ayant déjà réalisé une conversion sont responsables de la sérialisation et de la désérialisation des données d'enregistrement Kafka. Ils effectuent la traduction entre le format d'octet brut trouvé sur les sujets Kafka et la représentation interne des données structurées utilisée par Kafka Connect.
Pour les connecteurs de récepteur, les visiteurs ayant déjà réalisé une conversion désérialisent les données du format de transmission du sujet au format de données interne de Kafka Connect, que le connecteur utilise pour écrire dans le système cible.
Pour les connecteurs source, les visiteurs ayant déjà réalisé une conversion sérialisent les données du format de données interne de Kafka Connect au format de transmission spécifié pour le sujet Kafka.
Les visiteurs ayant déjà réalisé une conversion s'assurent que le connecteur lit ou écrit les enregistrements Kafka dans un format compatible avec le système externe.
Lorsque vous configurez un connecteur, définissez les propriétés suivantes :
Visiteur ayant déjà réalisé une conversion de clé (
key.converter) : visiteur ayant déjà réalisé une conversion à utiliser lors de la sérialisation et de la désérialisation des clés d'enregistrement Kafka.Visiteur ayant déjà réalisé une conversion de valeur (
value.converter) : visiteur ayant déjà réalisé une conversion à utiliser lors de la sérialisation et de la désérialisation des valeurs d'enregistrement Kafka.
Si vous ne spécifiez pas de visiteur ayant déjà réalisé une conversion, le type par défaut est org.apache.kafka.connect.converters.ByteArrayConverter, qui transmet les données dans leur format d'octet brut.
Visiteurs ayant déjà réalisé une conversion compatibles
Managed Service pour Apache Kafka est compatible avec les visiteurs ayant déjà réalisé une conversion intégrés suivants :
| Visiteur ayant déjà réalisé une conversion | Format |
|---|---|
io.confluent.connect.avro.AvroConverter |
Apache Avro |
org.apache.kafka.connect.converters.BooleanConverter |
Booléen |
org.apache.kafka.connect.converters.ByteArrayConverter |
Tableau d'octets Type de visiteur ayant déjà réalisé une conversion par défaut. Conserve le contenu exact des messages entre deux systèmes. |
org.apache.kafka.connect.converters.DoubleConverter |
Double |
org.apache.kafka.connect.converters.FloatConverter |
Float |
org.apache.kafka.connect.converters.IntegerConverter |
Integer |
org.apache.kafka.connect.json.JsonConverter |
JSON Pour les données JSON sans schéma, définissez également
|
org.apache.kafka.connect.converters.LongConverter |
Long |
org.apache.kafka.connect.converters.ShortConverter |
Short |
org.apache.kafka.connect.storage.StringConverter |
Chaîne |
Le choix du visiteur ayant déjà réalisé une conversion dépend du type de connecteur et des données que vous stockez dans Kafka. Pour en savoir plus, consultez la documentation du connecteur spécifique.
Tâches
Un connecteur transfère des données en créant une ou plusieurs tâches qui fonctionnent en parallèle. Pour définir une limite supérieure au nombre de tâches qu'un connecteur crée, définissez la propriété de configuration tasks.max du connecteur. Le connecteur peut créer moins de tâches que cette valeur.
L'augmentation de la valeur de tasks.max peut améliorer le débit, mais aussi accroître la consommation de ressources (processeur et mémoire). La valeur optimale dépend de la charge de travail et des ressources allouées aux nœuds de calcul de votre cluster Connect. Pour les connecteurs de récepteur, le nombre de partitions de sujet Kafka peut également affecter le parallélisme.
Règle de redémarrage des tâches
Vous pouvez définir une règle de redémarrage des tâches pour un connecteur, qui détermine le comportement en cas d'échec. Les connecteurs sont compatibles avec les règles suivantes :
Ne jamais redémarrer. Le connecteur ne redémarre pas les tâches ayant échoué. Il s'agit du comportement par défaut. Cette règle est utile pour le débogage ou dans les situations où une intervention manuelle est requise après une erreur.
Redémarrer avec un intervalle exponentiel entre les tentatives. Le connecteur redémarre une tâche ayant échoué après un délai (appelé période d'intervalle). Le délai augmente de manière exponentielle à chaque échec suivant. Cette règle est recommandée pour la plupart des charges de travail de production.
Si vous utilisez la règle d'intervalle exponentiel entre les tentatives, définissez également des valeurs pour l'intervalle minimal et maximal. L'intervalle minimal doit être supérieur à 60 secondes, et l'intervalle maximal doit être inférieur à 7 200 secondes.
Transformations et prédicats
Managed Service pour Apache Kafka est compatible avec les transformations et les prédicats Kafka Connect par défaut.
Les transformations vous permettent de modifier des messages individuels avant qu'ils ne soient envoyés à Managed Service pour Apache Kafka (pour les connecteurs source) ou au système externe (pour les connecteurs de récepteur). Vous pouvez utiliser une transformation pour masquer des données sensibles, ajouter des horodatages ou renommer des champs.
Les prédicats vous permettent de filtrer les données en fonction de conditions spécifiques, en déterminant les messages auxquels une transformation s'applique en fonction des propriétés du message.
Par exemple, pour configurer un connecteur de récepteur afin qu'il ignore les messages contenant une clé d'en-tête DoNotProcess, ajoutez la configuration suivante :
transforms=dropMessage
transforms.dropMessage.type=org.apache.kafka.connect.transforms.Filter
transforms.dropMessage.predicate=hasKey
predicates=hasKey
predicates.hasKey.type=org.apache.kafka.connect.transforms.predicates.HasHeaderKey
predicates.hasKey.name=DoNotProcess
Cette configuration effectue les opérations suivantes :
Configure un prédicat nommé
hasKeyde typeorg.apache.kafka.connect.transforms.predicates.HasHeaderKey. Ce prédicat correspond à tous les messages contenant un en-tête avec la cléDoNotProcess.Configure une transformation nommée
dropMessagede typeorg.apache.kafka.connect.transforms.Filter. Cette transformation supprime tous les messages qui correspondent au prédicat configuré.Associe la transformation au prédicat
hasKey. Cela garantit que seuls les messages contenant la clé d'en-têteDoNotProcesssont supprimés par la transformation.