Cette page explique comment utiliser Dataflow pour lire des données à partir de Google Cloud Managed Service pour Apache Kafka et écrire les enregistrements dans une table BigQuery. Ce tutoriel utilise le modèle Apache Kafka vers BigQuery pour créer le job Dataflow.
Présentation
Apache Kafka est une plate-forme Open Source pour les événements de traitement en flux continu. Kafka est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Vous pouvez utiliser Dataflow pour lire des événements à partir de Kafka, les traiter et écrire les résultats dans une table BigQuery pour une analyse plus approfondie.
Managed Service pour Apache Kafka est un service Google Cloud Platform qui vous aide à exécuter des clusters Kafka sécurisés et évolutifs.
Autorisations requises
Le compte de service de l'worker Dataflow doit disposer des rôles IAM (Identity and Access Management) suivants :
- Client Managed Kafka (
roles/managedkafka.client) - Éditeur de données BigQuery (
roles/bigquery.dataEditor)
Pour en savoir plus, consultez la section Sécurité et autorisations pour Dataflow.
Créer un cluster Kafka
Lors de cette étape, vous allez créer un cluster Managed Service pour Apache Kafka. Pour en savoir plus, consultez Créer un cluster Managed Service pour Apache Kafka.
Console
Accédez à la page Service géré pour Apache Kafka > Clusters.
Cliquez sur Créer.
Dans le champ Nom du cluster, saisissez un nom pour le cluster.
Dans la liste Région, sélectionnez un emplacement pour le cluster.
Cliquez sur Créer.
gcloud
Exécutez la commande managed-kafka clusters create.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Remplacez les éléments suivants :
CLUSTER: nom du clusterREGION: région où vous avez créé le sous-réseauPROJECT_ID: ID de votre projetSUBNET_NAME: sous-réseau dans lequel vous souhaitez déployer le cluster
La création d'un cluster prend généralement entre 20 et 30 minutes.
Créer un sujet Kafka
Une fois le cluster Managed Service pour Apache Kafka créé, créez un sujet.
Console
Accédez à la page Service géré pour Apache Kafka > Clusters.
Cliquez sur le nom du cluster.
Sur la page "Détails du cluster", cliquez sur Créer un sujet.
Dans le champ Nom du thème, saisissez un nom pour le thème.
Cliquez sur Créer.
gcloud
Exécutez la commande managed-kafka topics create.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Remplacez les éléments suivants :
TOPIC_NAME: nom du sujet à créer.
Créer une table BigQuery
Au cours de cette étape, vous allez créer une table BigQuery avec le schéma suivant :
| Nom de la colonne | Type de données |
|---|---|
name |
STRING |
customer_id |
INTEGER |
Si vous n'avez pas encore d'ensemble de données BigQuery, commencez par en créer un. Pour en savoir plus, consultez Créer des ensembles de données. Créez ensuite une table vide :
Console
Accédez à la page BigQuery.
Dans le volet Explorateur, développez votre projet, puis sélectionnez un ensemble de données.
Dans la section Informations sur l'ensemble de données, cliquez sur Créer une table.
Dans la liste Créer une table à partir de, sélectionnez Table vide.
Dans le champ Table, saisissez le nom de la table.
Dans la section Schéma, cliquez sur Modifier sous forme de texte.
Collez la définition de schéma suivante :
name:STRING, customer_id:INTEGERCliquez sur Créer une table.
gcloud
Exécutez la commande bq mk.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Remplacez les éléments suivants :
PROJECT_ID: ID de votre projetDATASET_NAME: nom de l'ensemble de donnéesTABLE_NAME: nom de la table à créer
Exécuter la tâche Dataflow
Après avoir créé le cluster Kafka et la table BigQuery, exécutez le modèle Dataflow.
Console
Commencez par obtenir l'adresse du serveur d'amorçage du cluster :
Dans la console Google Cloud , accédez à la page Clusters.
Cliquez sur le nom du cluster.
Cliquez sur l'onglet Configurations.
Copiez l'adresse du serveur d'amorçage à partir de l'URL d'amorçage.
Ensuite, exécutez le modèle pour créer le job Dataflow :
Accédez à la page Dataflow > Tâches.
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Dans le champ Nom du job, saisissez
kafka-to-bq.Pour Point de terminaison régional, sélectionnez la région dans laquelle se trouve votre cluster Managed Service pour Apache Kafka.
Sélectionnez le modèle "Kafka vers BigQuery".
Saisissez les paramètres de modèle suivants :
- Serveur d'amorçage Kafka : adresse du serveur d'amorçage
- Sujet Kafka source : nom du sujet à lire
- Mode d'authentification de la source Kafka :
APPLICATION_DEFAULT_CREDENTIALS - Format de message Kafka :
JSON - Stratégie de nommage des tables :
SINGLE_TABLE_NAME - Table de sortie BigQuery : table BigQuery, formatée comme suit :
PROJECT_ID:DATASET_NAME.TABLE_NAME
Sous File d'attente des messages non distribués, cochez Écrire les erreurs dans BigQuery.
Saisissez le nom d'une table BigQuery pour la file d'attente des messages non distribués, au format suivant :
PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME.Ne créez pas cette table à l'avance. Le pipeline le crée.
Cliquez sur Run Job (Exécuter la tâche).
gcloud
Exécutez la commande dataflow flex-template run.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Remplacez les variables suivantes :
LOCATION: région dans laquelle se trouve votre service géré pour Apache KafkaPROJECT_ID: nom de votre projet Google Cloud PlatformCLUSTER_ID: nom du clusterTOPIC: nom du sujet KafkaDATASET_NAME: nom de l'ensemble de donnéesTABLE_NAME: nom de la tableERROR_TABLE_NAME: nom de la table BigQuery pour la file d'attente des messages non distribués
Ne créez pas la table pour la file d'attente de lettres mortes à l'avance. Le pipeline le crée.
Envoyer des messages à Kafka
Une fois la tâche Dataflow lancée, vous pouvez envoyer des messages à Kafka. Le pipeline les écrit ensuite dans BigQuery.
Créez une VM dans le même sous-réseau que le cluster Kafka et installez les outils de ligne de commande Kafka. Pour obtenir des instructions détaillées, consultez Configurer une machine cliente dans Publier et consommer des messages avec la CLI.
Exécutez la commande suivante pour écrire des messages dans le sujet Kafka :
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Remplacez les variables suivantes :
TOPIC: nom du sujet KafkaCLUSTER_ID: nom du clusterLOCATION: région dans laquelle se trouve votre cluster.PROJECT_ID: nom de votre projet Google Cloud Platform
À l'invite, saisissez les lignes de texte suivantes pour envoyer des messages à Kafka :
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Utiliser une file d'attente de lettres mortes
Pendant l'exécution du job, il est possible que le pipeline ne parvienne pas à écrire des messages individuels dans BigQuery. Voici les erreurs possibles :
- Erreurs de sérialisation, y compris un format JSON mal formaté.
- Erreurs de conversion de type, causées par une incohérence dans le schéma de la table et les données JSON.
- Champs supplémentaires dans les données JSON qui ne sont pas présents dans le schéma de la table.
Ces erreurs n'entraînent pas l'échec de la tâche et n'apparaissent pas comme des erreurs dans le journal de tâche Dataflow. Au lieu de cela, le pipeline utilise une file d'attente de lettres mortes pour gérer ces types d'erreurs.
Pour activer la file d'attente des messages non distribués lorsque vous exécutez le modèle, définissez les paramètres de modèle suivants :
useBigQueryDLQ:trueoutputDeadletterTable: nom complet de la table BigQuery (par exemple,my-project:dataset1.errors)
Le pipeline crée automatiquement la table. Si une erreur se produit lors du traitement d'un message Kafka, le pipeline écrit une entrée d'erreur dans la table.
Exemples de messages d'erreur :
| Type d'erreur | Données d'événement | errorMessage |
|---|---|---|
| Erreur de sérialisation | "Hello world" | Échec de la sérialisation de JSON vers la ligne de table : "Hello world" |
| Erreur de conversion du type | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
| Champ inconnu | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Utiliser les types de données BigQuery
En interne, le connecteur d'E/S Kafka convertit les charges utiles des messages JSON en objets TableRow Apache Beam et traduit les valeurs des champs TableRow en types BigQuery.
Le tableau suivant présente les représentations JSON des types de données BigQuery.
| Type BigQuery | Représentation JSON |
|---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)"Spécifiez la zone géographique à l'aide de texte connu (WKT) ou de GeoJSON, mis en forme en tant que chaîne. Pour en savoir plus, consultez Charger des données géospatiales. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z"Utilisez la méthode JavaScript |
Données structurées
Si vos messages JSON suivent un schéma cohérent, vous pouvez représenter les objets JSON à l'aide du type de données STRUCT dans BigQuery.
Dans l'exemple suivant, le champ answers est un objet JSON avec deux sous-champs, a et b :
{"name":"Emily","answers":{"a":"yes","b":"no"}}
L'instruction SQL suivante crée une table BigQuery avec un schéma compatible :
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
La table obtenue ressemble à ceci :
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Données semi-structurées
Si vos messages JSON ne suivent pas un schéma strict, envisagez de les stocker dans BigQuery en tant que type de données JSON.
En stockant des données JSON en tant que type JSON, vous n'avez pas besoin de définir le schéma à l'avance. Après l'ingestion des données, vous pouvez les interroger à l'aide des opérateurs d'accès aux champs (notation par points) et d'accès aux tableaux dans GoogleSQL. Pour en savoir plus, consultez Utiliser des données JSON en langage GoogleSQL.
Utiliser une UDF pour transformer les données
Ce tutoriel suppose que les messages Kafka sont au format JSON et que le schéma de la table BigQuery correspond aux données JSON, sans qu'aucune transformation n'ait été appliquée aux données.
Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) en JavaScript qui transforme les données avant leur écriture dans BigQuery. Cette fonction permet également d'effectuer un traitement supplémentaire, tel que le filtrage, la suppression des informations personnelles ou l'enrichissement des données à l'aide de champs supplémentaires.
Pour en savoir plus, consultez Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.