Questo documento descrive come abilitare la derivazione dei dati su Google Cloud Serverless per i workload batch e le sessioni interattive di Apache Spark a livello di progetto, workload batch o sessione interattiva.
Panoramica
La derivazione dei dati è una funzionalità di Dataplex Universal Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.
Google Cloud I workload e le sessioni di Serverless per Apache Spark acquisiscono gli eventi di derivazione e li pubblicano in Dataplex Universal Catalog API Data Lineage. Serverless per Apache Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.
Puoi accedere alle informazioni sulla derivazione tramite Dataplex Universal Catalog, utilizzando i grafici di derivazione e l'API Data Lineage. Per saperne di più, consulta Visualizzare i grafici di derivazione in Dataplex Universal Catalog.
Disponibilità, funzionalità e limitazioni
La derivazione dei dati, che supporta le origini dati BigQuery e Cloud Storage
è disponibile per i workload e le sessioni eseguiti con
Serverless per Apache Spark versioni di runtime 1.2, 2.2, 2.3, e 3.0,
con le seguenti eccezioni e limitazioni:
- La derivazione dei dati non è disponibile per i workload o le sessioni SparkR o Spark Streaming.
Prima di iniziare
Nella pagina di selezione del progetto della Google Cloud console, seleziona il progetto da utilizzare per i workload o le sessioni di Serverless per Apache Spark.
Abilita l'API Data Lineage.
Ruoli obbligatori
Se il workload batch utilizza il
default service account predefinito di Serverless per Apache Spark,
ha il ruolo Dataproc Worker, che abilita la derivazione dei dati. Non sono necessarie ulteriori operazioni:
Tuttavia, se il workload batch utilizza un service account personalizzato per abilitare la derivazione dei dati, devi concedere un ruolo obbligatorio al service account personalizzato, come spiegato nel paragrafo seguente.
Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Dataproc, chiedi all'amministratore di concederti i seguenti ruoli IAM sul service account personalizzato del workload batch:
-
Concedi uno dei seguenti ruoli:
-
Dataproc Worker (
roles/dataproc.worker) -
Editor derivazione dei dati (
roles/datalineage.editor) -
Producer derivazione dei dati (
roles/datalineage.producer) -
Amministratore derivazione dei dati (
roles/datalineage.admin)
-
Dataproc Worker (
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Abilitare la derivazione dei dati a livello di progetto
Puoi abilitare la derivazione dei dati a livello di progetto. Se abilitata a livello di progetto, tutti i workload batch e le sessioni interattive successivi eseguiti nel progetto avranno la derivazione di Spark abilitata.
Come abilitare la derivazione dei dati a livello di progetto
Per abilitare la derivazione dei dati a livello di progetto, imposta i seguenti metadati di progetto personalizzati.
| Chiave | Valore |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
Puoi disabilitare la derivazione dei dati a livello di progetto impostando i
DATAPROC_LINEAGE_ENABLED metadati su false.
Abilitare la derivazione dei dati per un workload batch di Spark
Puoi abilitare la derivazione dei dati su un workload batch
impostando la proprietà spark.dataproc.lineage.enabled su true quando
invii il workload.
Esempio di workload batch
Questo esempio invia un workload batch lineage-example.py con la derivazione di Spark
abilitata.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py legge i dati da una tabella BigQuery
pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery
esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Esegui le seguenti sostituzioni:
REGION: seleziona una regione in cui eseguire il workload.
BUCKET: il nome di un bucket Cloud Storage esistente in cui archiviare le dipendenze.
PROJECT_ID, DATASET, e TABLE: inserisci l'ID progetto , il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere).
Puoi visualizzare il grafico di derivazione nell'interfaccia utente di Dataplex Universal Catalog.
Abilitare la derivazione dei dati per una sessione interattiva di Spark
Puoi abilitare la derivazione dei dati su una sessione interattiva di Spark
impostando la proprietà spark.dataproc.lineage.enabled su true quando
crei la sessione o il modello di sessione.
Esempio di sessione interattiva
Il seguente codice del notebook PySpark configura una sessione interattiva di Serverless per Apache Spark con la derivazione dei dati di Spark abilitata. Poi crea una sessione Spark Connect che esegue una query di conteggio parole su un set di dati BigQuery Shakespeare pubblico e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Esegui le seguenti sostituzioni:
- PROJECT_ID, DATASET, e TABLE: inserisci l'ID progetto , il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere).
Puoi visualizzare il grafico di derivazione dei dati facendo clic sul nome della tabella di destinazione elencato nel riquadro di navigazione nella pagina Explorer di BigQuery, quindi selezionando la scheda di derivazione nel riquadro dei dettagli della tabella.
Visualizzare la derivazione in Dataplex Universal Catalog
Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella Google Cloud console o recuperare le informazioni dall' API Data Lineage come dati JSON.
Passaggi successivi
- Scopri di più sulla derivazione dei dati.
- Provala in un lab interattivo: Acquisire ed esplorare gli aggiornamenti dei dati con la derivazione dei dati e OpenLineage