Activer la traçabilité des données Spark dans Dataproc

Ce document explique comment activer la provenance des données pour vos tâches Spark Dataproc au niveau du projet ou du cluster.

La traçabilité des données est une fonctionnalité de Dataplex Universal Catalog qui vous permet de suivre la manière dont les données transitent par vos systèmes : leur origine, la cible de transmission, et les transformations qui leur sont appliquées.

La traçabilité des données est disponible pour tous les jobs Spark Dataproc, à l'exception des jobs SparkR et Spark Streaming. Elle est compatible avec les sources de données BigQuery et Cloud Storage. Il est inclus dans les versions d'image Dataproc sur Compute Engine 2.0.74+, 2.1.22+, 2.2.50 et ultérieures.

Une fois la fonctionnalité activée dans votre cluster Dataproc, les jobs Dataproc Spark capturent les événements de traçabilité des données et les publient dans l'API Data Lineage de Dataplex Universal Catalog. Dataproc s'intègre à l'API Data Lineage via OpenLineage, à l'aide du plug-in OpenLineage Spark.

Vous pouvez accéder aux informations sur la traçabilité des données via Dataplex Universal Catalog, en utilisant les éléments suivants :

Avant de commencer

  1. Dans la console Google Cloud , sur la page de sélection du projet, sélectionnez le projet contenant le cluster Dataproc pour lequel vous souhaitez suivre la traçabilité.

    Accéder au sélecteur de projet

  2. Activez l'API Data Lineage.

    Activer les API

Rôles requis

Si vous créez un cluster Dataproc à l'aide du compte de service de VM par défaut, il dispose du rôle Dataproc Worker, qui permet le traçage des données. Aucune autre action n'est nécessaire.

Toutefois, si vous créez un cluster Dataproc qui utilise un compte de service personnalisé, vous devez attribuer un rôle requis à ce compte pour activer le lineage des données sur le cluster, comme expliqué dans le paragraphe suivant.

Pour obtenir les autorisations nécessaires pour utiliser le lineage de données avec Dataproc, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le compte de service personnalisé de votre cluster :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Activer la traçabilité des données Spark au niveau du projet

Vous pouvez activer le lineage des données Spark au niveau du projet. Les jobs Spark compatibles exécutés sur des clusters créés après l'activation de la traçabilité des données dans un projet seront associés à la traçabilité des données. Notez que les tâches exécutées sur des clusters existants (créés avant l'activation du lineage des données au niveau du projet) ne seront pas associées au lineage des données.

Activer la traçabilité des données Spark au niveau du projet

Pour activer le lineage des données Spark au niveau du projet, définissez les métadonnées personnalisées suivantes pour le projet :

Clé Valeur
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Vous pouvez désactiver la traçabilité des données Spark au niveau du projet en définissant les métadonnées DATAPROC_LINEAGE_ENABLED sur false.

Activer la traçabilité des données Spark au niveau du cluster

Vous pouvez activer le lineage de données Spark lorsque vous créez un cluster. Ainsi, le lineage de données sera activé pour toutes les tâches Spark compatibles envoyées au cluster.

Activer la traçabilité des données Spark au niveau du cluster

Pour activer le lineage de données Spark sur un cluster, créez un cluster Dataproc avec la propriété de cluster dataproc:dataproc.lineage.enabled définie sur true.

Clusters de version d'image 2.0 : le champ d'application cloud-platform de l'accès aux VM du cluster Dataproc est requis pour la traçabilité des données Spark. Les clusters de version d'image Dataproc créés avec la version d'image 2.1 et ultérieure sont compatibles avec cloud-platform. Si vous spécifiez la version d'image Dataproc 2.0 lorsque vous créez un cluster, définissez le champ d'application sur cloud-platform.

Exemple de gcloud CLI :

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Désactiver la traçabilité des données Spark pour une tâche

Si vous activez le lineage des données Spark au niveau du cluster, vous pouvez le désactiver pour un job spécifique en transmettant la propriété spark.extraListeners avec une valeur vide ("") lorsque vous envoyez le job.

Une fois activée, vous ne pouvez plus désactiver la traçabilité des données Spark sur le cluster. Pour éliminer la traçabilité des données Spark sur tous les jobs de cluster, vous pouvez recréer le cluster sans la propriété dataproc:dataproc.lineage.enabled.

Envoyer une tâche Spark

Lorsque vous envoyez une tâche Spark sur un cluster Dataproc créé avec l'option de traçabilité des données Spark activée, Dataproc capture et signale les informations de traçabilité des données à l'API Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Remarques :

  • L'ajout des propriétés spark.openlineage.namespace et spark.openlineage.appName, qui permettent d'identifier de manière unique le job, est facultatif. Si vous n'ajoutez pas ces propriétés, Dataproc utilise les valeurs par défaut suivantes :
    • Valeur par défaut pour spark.openlineage.namespace : PROJECT_ID
    • Valeur par défaut pour spark.openlineage.appName : spark.app.name

Afficher la traçabilité dans Dataplex Universal Catalog

Un graphique de traçabilité affiche les relations entre les ressources de votre projet et les processus qui les ont créées. Vous pouvez afficher les informations sur la traçabilité des données dans la console Google Cloud ou les récupérer à partir de l'API Data Lineage sous forme de données JSON.

Exemple de code PySpark :

Le job PySpark suivant lit les données d'une table BigQuery publique, puis écrit la sortie dans une nouvelle table d'un ensemble de données BigQuery existant. Il utilise un bucket Cloud Storage pour le stockage temporaire.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Effectuez les remplacements suivants :

  • BUCKET : nom d'un bucket Cloud Storage existant.

  • PROJECT_ID, DATASET et TABLE : insérez votre ID de projet, le nom d'un ensemble de données BigQuery existant et le nom d'une nouvelle table à créer dans l'ensemble de données (la table ne doit pas exister).

Vous pouvez afficher le graphique de traçabilité dans l'interface utilisateur Dataplex Universal Catalog.

Exemple de graphique de traçabilité

Étapes suivantes