Écrire des données de Kafka vers BigQuery à l'aide de Dataflow

Cette page explique comment utiliser Dataflow pour lire des données à partir de Google Cloud Managed Service pour Apache Kafka et écrire les enregistrements dans une table BigQuery. Ce tutoriel utilise le modèle Apache Kafka vers BigQuery pour créer le job Dataflow.

Présentation

Apache Kafka est une plate-forme Open Source pour les événements de traitement en flux continu. Kafka est couramment utilisé dans les architectures distribuées pour permettre la communication entre des composants faiblement couplés. Vous pouvez utiliser Dataflow pour lire des événements à partir de Kafka, les traiter et écrire les résultats dans une table BigQuery pour une analyse plus approfondie.

Managed Service pour Apache Kafka est un service Google Cloud Platform qui vous aide à exécuter des clusters Kafka sécurisés et évolutifs.

Lire des événements Kafka dans BigQuery
Architecture événementielle avec Apache Kafka

Autorisations requises

Le compte de service de l'worker Dataflow doit disposer des rôles IAM (Identity and Access Management) suivants :

  • Client Managed Kafka (roles/managedkafka.client)
  • Éditeur de données BigQuery (roles/bigquery.dataEditor)

Pour en savoir plus, consultez la section Sécurité et autorisations pour Dataflow.

Créer un cluster Kafka

Lors de cette étape, vous allez créer un cluster Managed Service pour Apache Kafka. Pour en savoir plus, consultez Créer un cluster Managed Service pour Apache Kafka.

Console

  1. Accédez à la page Service géré pour Apache Kafka > Clusters.

    accéder aux clusters

  2. Cliquez sur Créer.

  3. Dans le champ Nom du cluster, saisissez un nom pour le cluster.

  4. Dans la liste Région, sélectionnez un emplacement pour le cluster.

  5. Cliquez sur Créer.

gcloud

Exécutez la commande managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Remplacez les éléments suivants :

  • CLUSTER : nom du cluster
  • REGION : région où vous avez créé le sous-réseau
  • PROJECT_ID : ID de votre projet
  • SUBNET_NAME : sous-réseau dans lequel vous souhaitez déployer le cluster

La création d'un cluster prend généralement entre 20 et 30 minutes.

Créer un sujet Kafka

Une fois le cluster Managed Service pour Apache Kafka créé, créez un sujet.

Console

  1. Accédez à la page Service géré pour Apache Kafka > Clusters.

    accéder aux clusters

  2. Cliquez sur le nom du cluster.

  3. Sur la page "Détails du cluster", cliquez sur Créer un sujet.

  4. Dans le champ Nom du thème, saisissez un nom pour le thème.

  5. Cliquez sur Créer.

gcloud

Exécutez la commande managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Remplacez les éléments suivants :

  • TOPIC_NAME : nom du sujet à créer.

Créer une table BigQuery

Au cours de cette étape, vous allez créer une table BigQuery avec le schéma suivant :

Nom de la colonne Type de données
name STRING
customer_id INTEGER

Si vous n'avez pas encore d'ensemble de données BigQuery, commencez par en créer un. Pour en savoir plus, consultez Créer des ensembles de données. Créez ensuite une table vide :

Console

  1. Accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans le volet Explorateur, développez votre projet, puis sélectionnez un ensemble de données.

  3. Dans la section Informations sur l'ensemble de données, cliquez sur Créer une table.

  4. Dans la liste Créer une table à partir de, sélectionnez Table vide.

  5. Dans le champ Table, saisissez le nom de la table.

  6. Dans la section Schéma, cliquez sur Modifier sous forme de texte.

  7. Collez la définition de schéma suivante :

    name:STRING,
    customer_id:INTEGER
    
  8. Cliquez sur Créer une table.

gcloud

Exécutez la commande bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet
  • DATASET_NAME : nom de l'ensemble de données
  • TABLE_NAME : nom de la table à créer

Exécuter la tâche Dataflow

Après avoir créé le cluster Kafka et la table BigQuery, exécutez le modèle Dataflow.

Console

Commencez par obtenir l'adresse du serveur d'amorçage du cluster :

  1. Dans la console Google Cloud , accédez à la page Clusters.

    accéder aux clusters

  2. Cliquez sur le nom du cluster.

  3. Cliquez sur l'onglet Configurations.

  4. Copiez l'adresse du serveur d'amorçage à partir de l'URL d'amorçage.

Ensuite, exécutez le modèle pour créer le job Dataflow :

  1. Accédez à la page Dataflow > Tâches.

    Accéder aux tâches

  2. Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).

  3. Dans le champ Nom du job, saisissez kafka-to-bq.

  4. Pour Point de terminaison régional, sélectionnez la région dans laquelle se trouve votre cluster Managed Service pour Apache Kafka.

  5. Sélectionnez le modèle "Kafka vers BigQuery".

  6. Saisissez les paramètres de modèle suivants :

    • Serveur d'amorçage Kafka : adresse du serveur d'amorçage
    • Sujet Kafka source : nom du sujet à lire
    • Mode d'authentification de la source Kafka : APPLICATION_DEFAULT_CREDENTIALS
    • Format de message Kafka : JSON
    • Stratégie de nommage des tables : SINGLE_TABLE_NAME
    • Table de sortie BigQuery : table BigQuery, formatée comme suit : PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. Sous File d'attente des messages non distribués, cochez Écrire les erreurs dans BigQuery.

  8. Saisissez le nom d'une table BigQuery pour la file d'attente des messages non distribués, au format suivant : PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME.

    Ne créez pas cette table à l'avance. Le pipeline le crée.

  9. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Exécutez la commande dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Remplacez les variables suivantes :

  • LOCATION : région dans laquelle se trouve votre service géré pour Apache Kafka
  • PROJECT_ID : nom de votre projet Google Cloud Platform
  • CLUSTER_ID : nom du cluster
  • TOPIC : nom du sujet Kafka
  • DATASET_NAME : nom de l'ensemble de données
  • TABLE_NAME : nom de la table
  • ERROR_TABLE_NAME : nom de la table BigQuery pour la file d'attente des messages non distribués

Ne créez pas la table pour la file d'attente de lettres mortes à l'avance. Le pipeline le crée.

Envoyer des messages à Kafka

Une fois la tâche Dataflow lancée, vous pouvez envoyer des messages à Kafka. Le pipeline les écrit ensuite dans BigQuery.

  1. Créez une VM dans le même sous-réseau que le cluster Kafka et installez les outils de ligne de commande Kafka. Pour obtenir des instructions détaillées, consultez Configurer une machine cliente dans Publier et consommer des messages avec la CLI.

  2. Exécutez la commande suivante pour écrire des messages dans le sujet Kafka :

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Remplacez les variables suivantes :

    • TOPIC : nom du sujet Kafka
    • CLUSTER_ID : nom du cluster
    • LOCATION : région dans laquelle se trouve votre cluster.
    • PROJECT_ID : nom de votre projet Google Cloud Platform
  3. À l'invite, saisissez les lignes de texte suivantes pour envoyer des messages à Kafka :

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Utiliser une file d'attente de lettres mortes

Pendant l'exécution du job, il est possible que le pipeline ne parvienne pas à écrire des messages individuels dans BigQuery. Voici les erreurs possibles :

  • Erreurs de sérialisation, y compris un format JSON mal formaté.
  • Erreurs de conversion de type, causées par une incohérence dans le schéma de la table et les données JSON.
  • Champs supplémentaires dans les données JSON qui ne sont pas présents dans le schéma de la table.

Ces erreurs n'entraînent pas l'échec de la tâche et n'apparaissent pas comme des erreurs dans le journal de tâche Dataflow. Au lieu de cela, le pipeline utilise une file d'attente de lettres mortes pour gérer ces types d'erreurs.

Pour activer la file d'attente des messages non distribués lorsque vous exécutez le modèle, définissez les paramètres de modèle suivants :

  • useBigQueryDLQ : true
  • outputDeadletterTable : nom complet de la table BigQuery (par exemple, my-project:dataset1.errors)

Le pipeline crée automatiquement la table. Si une erreur se produit lors du traitement d'un message Kafka, le pipeline écrit une entrée d'erreur dans la table.

Exemples de messages d'erreur :

Type d'erreur Données d'événement errorMessage
Erreur de sérialisation "Hello world" Échec de la sérialisation de JSON vers la ligne de table : "Hello world"
Erreur de conversion du type {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Champ inconnu {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Utiliser les types de données BigQuery

En interne, le connecteur d'E/S Kafka convertit les charges utiles des messages JSON en objets TableRow Apache Beam et traduit les valeurs des champs TableRow en types BigQuery.

Le tableau suivant présente les représentations JSON des types de données BigQuery.

Type BigQuery Représentation JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Spécifiez la zone géographique à l'aide de texte connu (WKT) ou de GeoJSON, mis en forme en tant que chaîne. Pour en savoir plus, consultez Charger des données géospatiales.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Utilisez la méthode JavaScript Date.toJSON pour mettre en forme la valeur.

Données structurées

Si vos messages JSON suivent un schéma cohérent, vous pouvez représenter les objets JSON à l'aide du type de données STRUCT dans BigQuery.

Dans l'exemple suivant, le champ answers est un objet JSON avec deux sous-champs, a et b :

{"name":"Emily","answers":{"a":"yes","b":"no"}}

L'instruction SQL suivante crée une table BigQuery avec un schéma compatible :

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

La table obtenue ressemble à ceci :

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Données semi-structurées

Si vos messages JSON ne suivent pas un schéma strict, envisagez de les stocker dans BigQuery en tant que type de données JSON. En stockant des données JSON en tant que type JSON, vous n'avez pas besoin de définir le schéma à l'avance. Après l'ingestion des données, vous pouvez les interroger à l'aide des opérateurs d'accès aux champs (notation par points) et d'accès aux tableaux dans GoogleSQL. Pour en savoir plus, consultez Utiliser des données JSON en langage GoogleSQL.

Utiliser une UDF pour transformer les données

Ce tutoriel suppose que les messages Kafka sont au format JSON et que le schéma de la table BigQuery correspond aux données JSON, sans qu'aucune transformation n'ait été appliquée aux données.

Vous pouvez éventuellement fournir une fonction définie par l'utilisateur (UDF) en JavaScript qui transforme les données avant leur écriture dans BigQuery. Cette fonction permet également d'effectuer un traitement supplémentaire, tel que le filtrage, la suppression des informations personnelles ou l'enrichissement des données à l'aide de champs supplémentaires.

Pour en savoir plus, consultez Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Étapes suivantes