Questo documento descrive come attivare la derivazione dei dati sui Google Cloud workload batch e sulle sessioni interattive di Serverless per Apache Spark a livello di progetto, workload batch o sessione interattiva.
Panoramica
La derivazione dei dati è una funzionalità di Dataplex Universal Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.
Google Cloud I workload e le sessioni di Serverless per Apache Spark acquisiscono gli eventi di derivazione e li pubblicano nell'API Data Lineage del catalogo universale Dataplex. Serverless per Apache Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.
Puoi accedere alle informazioni sulla derivazione tramite Dataplex Universal Catalog, utilizzando i grafici di derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici di derivazione in Dataplex Universal Catalog.
Disponibilità
La lineage dei dati, che supporta le origini dati BigQuery e Cloud Storage, è disponibile per i carichi di lavoro e le sessioni eseguiti con le versioni del runtime Serverless per Apache Spark supportate con le seguenti eccezioni e limitazioni:
- La derivazione dei dati non è disponibile per i carichi di lavoro o le sessioni SparkR o Spark Streaming.
Prima di iniziare
Nella pagina di selezione del progetto nella console Google Cloud , seleziona il progetto da utilizzare per i workload o le sessioni di Serverless per Apache Spark.
Abilita l'API Data Lineage.
Modifiche imminenti alla lineage dei dati Spark Consulta le note di rilascio di Serverless per Apache Spark per l'annuncio di una modifica che renderà automaticamente la lineage dei dati Spark disponibile per i tuoi progetti, workload batch e sessioni interattive quando abiliti l'API Data Lineage (vedi Controllare l'importazione della lineage per un servizio) senza richiedere impostazioni aggiuntive per progetti, workload batch o sessioni interattive.
Ruoli obbligatori
Se il tuo workload batch utilizza l'account di servizio Serverless per Apache Spark predefinito, dispone del ruolo Dataproc Worker, che contiene le autorizzazioni richieste dalla lineage dei dati.
Tuttavia, se il tuo batch utilizza un account di servizio personalizzato per attivare la derivazione dei dati, devi concedere uno dei ruoli elencati nel paragrafo seguente, che contengono le autorizzazioni richieste dalla derivazione dei dati, account di serviziount personalizzato.
Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Dataproc, chiedi all'amministratore di concederti i seguenti ruoli IAM sul account di servizio personalizzato del carico di lavoro batch:
-
Concedi uno dei seguenti ruoli:
-
Dataproc Worker (
roles/dataproc.worker) -
Editor Data Lineage (
roles/datalineage.editor) -
Data Lineage Producer (
roles/datalineage.producer) -
Amministratore di Data Lineage (
roles/datalineage.admin)
-
Dataproc Worker (
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Abilitare la derivazione dei dati Spark
Puoi attivare la derivazione dei dati Spark per il tuo progetto, il tuo carico di lavoro batch o la tua sessione interattiva.
Abilitare la derivazione dei dati a livello di progetto
Dopo aver attivato la derivazione dei dati Spark a livello di progetto, i successivi job Spark eseguiti in un carico di lavoro batch o in una sessione interattiva avranno la derivazione dei dati Spark attivata.
Per attivare la derivazione dei dati Spark nel tuo progetto, imposta i seguenti metadati di progetto personalizzati:
| Chiave | Valore |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
Puoi disattivare la derivazione dei dati Spark per un progetto impostando i metadati DATAPROC_LINEAGE_ENABLED su false.
Abilitare la derivazione dei dati in un workload batch Spark
Per attivare la derivazione dei dati su un workload batch,
imposta la proprietà spark.dataproc.lineage.enabled su true quando invii il workload. Questa impostazione sostituisce qualsiasi impostazione della lineage dei dati Spark
a livello di progetto: se la lineage dei dati Spark
è disattivata a livello di progetto, ma attivata per il workload batch, l'impostazione del workload batch ha la precedenza.
Puoi disattivare la derivazione dei dati Spark in un carico di lavoro batch Spark
impostando la proprietà spark.dataproc.lineage.enabled su false
quando invii il carico di lavoro.
Questo esempio utilizza gcloud CLI per inviare un batch
lineage-example.py con la derivazione Spark abilitata.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
Il seguente codice lineage-example.py legge i dati da una tabella BigQuery pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Sostituisci quanto segue:
- REGION: la regione in cui eseguire il workload
- BUCKET: il nome di un bucket Cloud Storage esistente in cui archiviare le dipendenze
- PROJECT_ID, DATASET e TABLE: l'ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere)
Puoi visualizzare il grafico della derivazione nella UI di Dataplex Universal Catalog.
Abilitare la derivazione dei dati in una sessione interattiva Spark o in un modello di sessione
Per attivare la derivazione dei dati in una sessione interattiva Spark o in un modello di sessione,
imposta la proprietà spark.dataproc.lineage.enabled su true quando
crei la sessione o il modello di sessione. Questa impostazione sostituisce qualsiasi impostazione di derivazione dei dati Spark
a livello di progetto: se la derivazione dei dati Spark
è disattivata a livello di progetto, ma attivata per la sessione interattiva, l'impostazione della sessione interattiva ha la precedenza.
Puoi disattivare la derivazione dei dati Spark in una sessione interattiva o in un modello di sessione Spark
impostando la proprietà spark.dataproc.lineage.enabled su false
quando crei la sessione interattiva o il modello di sessione.
Il seguente codice del notebook PySpark configura una sessione interattiva di Serverless per Apache Spark con la derivazione dei dati Spark abilitata. Poi crea una sessione Spark Connect che esegue una query di conteggio parole su un set di dati pubblico di BigQuery Shakespeare e scrive l'output in una nuova tabella in un set di dati BigQuery esistente (vedi Creare una sessione Spark in un notebook BigQuery Studio) .
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Sostituisci quanto segue:
- PROJECT_ID, DATASET e TABLE: l'ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere)
Puoi visualizzare il grafico della derivazione dei dati facendo clic sul nome della tabella di destinazione elencato nel riquadro di navigazione della pagina Esplora di BigQuery, quindi selezionando la scheda Derivazione nel riquadro dei dettagli della tabella.
Visualizza la derivazione nel Catalogo universale Dataplex
Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella console Google Cloud o recuperare le informazioni dall'API Data Lineage come dati JSON.
Passaggi successivi
- Scopri di più sulla derivazione dei dati.
- Prova la derivazione dei dati in un lab interattivo: Acquisizione ed esplorazione degli aggiornamenti dei dati con la derivazione dei dati e OpenLineage in Dataplex.