Ce document explique comment activer la traçabilité des données pour les charges de travail par lot et les sessions interactivesGoogle Cloud Serverless pour Apache Spark au niveau du projet, de la charge de travail par lot ou de la session interactive.
Présentation
La traçabilité des données est une fonctionnalité Dataplex Universal Catalog qui vous permet de suivre la manière dont les données transitent par vos systèmes : leur origine, la cible de transmission, et les transformations qui leur sont appliquées.
Les charges de travail et les sessions Serverless pour Apache Spark capturent les événements de traçabilité et les publient dans l'API Data Lineage de Dataplex Universal Catalog.Google Cloud Serverless pour Apache Spark s'intègre à l'API Data Lineage via OpenLineage, à l'aide du plug-in OpenLineage Spark.
Vous pouvez accéder aux informations de traçabilité via Dataplex Universal Catalog, à l'aide des graphiques de traçabilité et de l'API Data Lineage. Pour en savoir plus, consultez Afficher les graphiques de traçabilité dans Dataplex Universal Catalog.
Disponibilité
La traçabilité des données, qui est compatible avec les sources de données BigQuery et Cloud Storage, est disponible pour les charges de travail et les sessions qui s'exécutent avec les versions d'exécution Serverless pour Apache Spark compatibles, à l'exception des éléments suivants :
- La lignée de données n'est pas disponible pour les charges de travail ni les sessions SparkR ou Spark Streaming.
Avant de commencer
Sur la page de sélection du projet dans la console Google Cloud , sélectionnez le projet à utiliser pour vos charges de travail ou sessions Serverless pour Apache Spark.
Activez l'API Data Lineage.
Modifications à venir concernant la traçabilité des données Spark Consultez les notes de version de Serverless pour Apache Spark pour en savoir plus sur une modification qui rendra automatiquement la traçabilité des données Spark disponible pour vos projets, charges de travail par lot et sessions interactives lorsque vous activerez l'API Data Lineage (voir Contrôler l'ingestion de la traçabilité pour un service). Vous n'aurez pas besoin de configurer d'autres paramètres pour les projets, les charges de travail par lot ou les sessions interactives.
Rôles requis
Si votre charge de travail par lot utilise le compte de service Serverless pour Apache Spark par défaut, elle dispose du rôle Dataproc Worker, qui contient les autorisations requises par le lineage des données.
Toutefois, si votre charge de travail par lot utilise un compte de service personnalisé pour activer le lineage des données, vous devez accorder l'un des rôles listés dans le paragraphe suivant au compte de service personnalisé. Ces rôles contiennent les autorisations requises par le lineage des données.
Pour obtenir les autorisations nécessaires pour utiliser le lineage de données avec Dataproc, demandez à votre administrateur de vous accorder les rôles IAM suivants sur le compte de service personnalisé de votre charge de travail par lot :
-
Attribuez l'un des rôles suivants :
-
Nœud de calcul Dataproc (
roles/dataproc.worker) -
Éditeur de traçabilité des données (
roles/datalineage.editor) -
Producteur de traçabilité des données (
roles/datalineage.producer) -
Administrateur de la traçabilité des données (
roles/datalineage.admin)
-
Nœud de calcul Dataproc (
Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.
Activer la traçabilité des données Spark
Vous pouvez activer la traçabilité des données Spark pour votre projet, votre charge de travail par lot ou votre session interactive.
Activer la traçabilité des données au niveau du projet
Une fois que vous avez activé le lineage des données Spark au niveau du projet, le lineage des données Spark est activé pour les tâches Spark ultérieures qui s'exécutent dans une charge de travail par lot ou une session interactive.
Pour activer la traçabilité des données Spark dans votre projet, définissez les métadonnées de projet personnalisées suivantes :
| Clé | Valeur |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
Vous pouvez désactiver le lineage des données Spark pour un projet en définissant les métadonnées DATAPROC_LINEAGE_ENABLED sur false.
Activer la traçabilité des données sur une charge de travail par lot Spark
Pour activer le lineage des données sur une charge de travail par lot, définissez la propriété spark.dataproc.lineage.enabled sur true lorsque vous envoyez la charge de travail. Ce paramètre remplace tout paramètre de traçabilité des données Spark au niveau du projet : si la traçabilité des données Spark est désactivée au niveau du projet, mais activée pour la charge de travail par lot, le paramètre de la charge de travail par lot prévaut.
Vous pouvez désactiver le lineage des données Spark sur une charge de travail par lot Spark en définissant la propriété spark.dataproc.lineage.enabled sur false lorsque vous envoyez la charge de travail.
Cet exemple utilise gcloud CLI pour envoyer une charge de travail par lot lineage-example.py avec le traçage Spark activé.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
Le code lineage-example.py suivant lit les données d'une table BigQuery publique, puis écrit la sortie dans une nouvelle table d'un ensemble de données BigQuery existant. Il utilise un bucket Cloud Storage pour le stockage temporaire.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Remplacez les éléments suivants :
- REGION : région dans laquelle exécuter la charge de travail.
- BUCKET : nom d'un bucket Cloud Storage existant dans lequel stocker les dépendances
- PROJECT_ID, DATASET et TABLE : ID du projet, nom d'un ensemble de données BigQuery existant et nom d'une nouvelle table à créer dans l'ensemble de données (la table ne doit pas exister)
Vous pouvez afficher le graphique de traçabilité dans l'interface utilisateur Dataplex Universal Catalog.
Activer la traçabilité des données sur une session interactive Spark ou un modèle de session
Pour activer le traçage des données dans une session interactive ou un modèle de session Spark, définissez la propriété spark.dataproc.lineage.enabled sur true lorsque vous créez la session ou le modèle de session. Ce paramètre remplace tout paramètre de lignée de données Spark au niveau du projet : si la lignée de données Spark est désactivée au niveau du projet, mais activée pour la session interactive, le paramètre de la session interactive est prioritaire.
Vous pouvez désactiver le lineage des données Spark dans une session interactive ou un modèle de session Spark en définissant la propriété spark.dataproc.lineage.enabled sur false lorsque vous créez la session interactive ou le modèle de session.
Le code de notebook PySpark suivant configure une session interactive Serverless pour Apache Spark avec la traçabilité des données Spark activée. Il crée ensuite une session Spark Connect qui exécute une requête de décompte de mots sur un ensemble de données Shakespeare public BigQuery, puis écrit la sortie dans une nouvelle table d'un ensemble de données BigQuery existant (voir Créer une session Spark dans un notebook BigQuery Studio) .
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Remplacez les éléments suivants :
- PROJECT_ID, DATASET et TABLE : ID du projet, nom d'un ensemble de données BigQuery existant et nom d'une nouvelle table à créer dans l'ensemble de données (la table ne doit pas exister)
Pour afficher le graphique de traçabilité des données, cliquez sur le nom de la table de destination listée dans le volet de navigation de la page Explorateur de BigQuery, puis sélectionnez l'onglet "Traçabilité" dans le volet d'informations sur la table.
Afficher la traçabilité dans Dataplex Universal Catalog
Un graphique de traçabilité affiche les relations entre les ressources de votre projet et les processus qui les ont créées. Vous pouvez afficher les informations sur la traçabilité des données dans la console Google Cloud ou les récupérer à partir de l'API Data Lineage sous forme de données JSON.
Étapes suivantes
- Découvrez-en plus sur la traçabilité des données.
- Essayez le lineage des données dans un atelier interactif : Capture and Explore Data Updates With Data Lineage and OpenLineage in Dataplex (Capturer et explorer les mises à jour de données avec le lineage des données et OpenLineage dans Dataplex).