Questo documento descrive come abilitare la derivazione dei dati per i workload batch e le sessioni interattive di Managed Service for Apache Spark a livello di progetto, workload batch o sessione interattiva.
Panoramica
La derivazione dei dati è una funzionalità di Knowledge Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.
I workload e le sessioni di Managed Service for Apache Spark acquisiscono gli eventi di derivazione e li pubblicano in Knowledge Catalog Data Lineage API. Managed Service for Apache Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.
Puoi accedere alle informazioni sulla derivazione tramite Knowledge Catalog, utilizzando i grafici di derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici di derivazione in Knowledge Catalog.
Disponibilità
La derivazione dei dati, che supporta le origini dati BigQuery e Cloud Storage è disponibile per i workload e le sessioni eseguiti con le versioni di runtime di Managed Service for Apache Spark supportate con le seguenti eccezioni e limitazioni:
- La derivazione dei dati non è disponibile per i workload o le sessioni SparkR o Spark Streaming.
Prima di iniziare
Nella pagina di selezione del progetto della Google Cloud console, seleziona il progetto da utilizzare per i workload o le sessioni di Managed Service for Apache Spark.
Abilita l'API Data Lineage.
Modifiche imminenti alla derivazione dei dati di Spark Consulta le note di rilascio di Managed Service for Apache Spark per l'annuncio di una modifica che renderà automaticamente disponibile la derivazione dei dati di Spark per i progetti, i workload batch e le sessioni interattive quando abiliti l'API Data Lineage (vedi Controllare l'importazione della derivazione per un servizio) senza richiedere ulteriori impostazioni di progetto, workload batch o sessione interattiva.
Ruoli obbligatori
Se il workload batch utilizza l'account di servizio predefinito di Managed Service for Apache Spark, ha il ruolo Managed Service for Apache Spark Worker, che contiene le autorizzazioni richieste dalla derivazione dei dati.
Tuttavia, se il workload batch utilizza un account di servizio personalizzato per abilitare la derivazione dei dati, devi concedere all'account di servizio personalizzato uno dei ruoli elencati nel paragrafo seguente, che contengono le autorizzazioni richieste dalla derivazione dei dati.
Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Managed Service for Apache Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM sull'account di servizio personalizzato del workload batch:
-
Concedi uno dei seguenti ruoli:
-
Managed Service for Apache Spark Worker (
roles/dataproc.worker) -
Data Lineage Editor (
roles/datalineage.editor) -
Data Lineage Producer (
roles/datalineage.producer) -
Data Lineage Administrator (
roles/datalineage.admin)
-
Managed Service for Apache Spark Worker (
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Abilitare la derivazione dei dati di Spark
Puoi abilitare la derivazione dei dati di Spark per il progetto, il workload batch o la sessione interattiva.
Abilitare la derivazione dei dati a livello di progetto
Dopo aver abilitato la derivazione dei dati di Spark a livello di progetto, i job Spark successivi eseguiti in un workload batch o in una sessione interattiva avranno la derivazione dei dati di Spark abilitata.
Per abilitare la derivazione dei dati di Spark nel tuo progetto, imposta i seguenti metadati di progetto personalizzati:
| Chiave | Valore |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
Puoi disabilitare la derivazione dei dati di Spark per un progetto impostando i metadati DATAPROC_LINEAGE_ENABLED su false.
Abilitare la derivazione dei dati in un workload batch di Spark
Per abilitare la derivazione dei dati in un workload batch, imposta la proprietà spark.dataproc.lineage.enabled su true quando invii il workload. Questa impostazione sostituisce qualsiasi impostazione di derivazione dei dati di Spark
a livello di progetto: se la derivazione dei dati di Spark è disabilitata a livello di progetto, ma abilitata per il workload batch,
l'impostazione del workload batch ha la precedenza.
Puoi disabilitare la derivazione dei dati di Spark in un workload batch di Spark impostando la proprietà spark.dataproc.lineage.enabled su false quando invii il workload.
Questo esempio utilizza la gcloud CLI per inviare un workload batch lineage-example.py con la derivazione di Spark abilitata.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
Il seguente codice lineage-example.py legge i dati da una tabella BigQuery pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Sostituisci quanto segue:
- REGION: la regione in cui eseguire il workload
- BUCKET: il nome di un bucket Cloud Storage esistente in cui archiviare le dipendenze
- PROJECT_ID, DATASET, e TABLE: l' ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere)
Puoi visualizzare il grafico di derivazione nell'interfaccia utente di Knowledge Catalog.
Abilitare la derivazione dei dati in una sessione interattiva o in un modello di sessione di Spark
Per abilitare la derivazione dei dati in una sessione interattiva o in un modello di sessione di Spark,
imposta la proprietà spark.dataproc.lineage.enabled su true quando
crei la sessione o il modello di sessione. Questa impostazione sostituisce qualsiasi impostazione di derivazione dei dati di Spark
a livello di progetto: se la derivazione dei dati di Spark
è disabilitata a livello di progetto, ma abilitata per la sessione interattiva,
l'impostazione della sessione interattiva ha la precedenza.
Puoi disabilitare la derivazione dei dati di Spark in una sessione interattiva o in un modello di sessione di Spark impostando la proprietà spark.dataproc.lineage.enabled su false quando crei la sessione interattiva o il modello di sessione.
Il seguente codice del notebook PySpark configura una sessione interattiva di Managed Service for Apache Spark con la derivazione dei dati di Spark abilitata. Poi crea una sessione Spark Connect che esegue una query di conteggio parole su un set di dati pubblico di BigQuery Shakespeare e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente (vedi Creare una sessione Spark in un notebook di BigQuery Studio) .
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Sostituisci quanto segue:
- PROJECT_ID, DATASET, e TABLE: l' ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere)
Puoi visualizzare il grafico di derivazione dei dati facendo clic sul nome della tabella di destinazione elencato nel riquadro di navigazione nella pagina Explorer di BigQuery, quindi selezionando la scheda di derivazione nel riquadro dei dettagli della tabella.
Visualizzare la derivazione in Knowledge Catalog
Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella Google Cloud console o recuperare le informazioni dall' API Data Lineage come dati JSON.
Passaggi successivi
- Scopri di più sulla derivazione dei dati.
- Prova la derivazione dei dati in un lab interattivo: Acquisire ed esplorare gli aggiornamenti dei dati con Data Lineage e OpenLineage in Dataplex.