Abilita la derivazione dei dati Spark in Dataproc

Questo documento descrive come abilitare la derivazione dei dati per i job Spark di Dataproc a livello di progetto o di cluster.

La derivazione dei dati è una funzionalità di Dataplex Universal Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.

La derivazione dei dati è disponibile per tutti i job Spark di Dataproc ad eccezione dei job SparkR e Spark Streaming, e supporta le origini dati BigQuery e Cloud Storage. È inclusa nelle versioni delle immagini Dataproc su Compute Engine 2.0.74+, 2.1.22+, 2.2.50 e successive.

Una volta abilitata la funzionalità nel cluster Dataproc, i job Spark di Dataproc acquisiscono gli eventi di derivazione dei dati e li pubblicano nell'API Data Lineage di Dataplex Universal Catalog Data Lineage. Dataproc si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.

Puoi accedere alle informazioni sulla derivazione dei dati tramite Dataplex Universal Catalog, utilizzando quanto segue:

Prima di iniziare

  1. Nella Google Cloud console, nella pagina di selezione del progetto, seleziona il progetto che contiene il cluster Dataproc di cui vuoi monitorare la derivazione.

    Vai al selettore di progetti

  2. Abilita l'API Data Lineage.

    Abilita le API

Ruoli obbligatori

Se crei un cluster Dataproc utilizzando l' account di servizio VM predefinito, questo ha il ruolo Dataproc Worker, che abilita la derivazione dei dati. Non sono necessarie ulteriori operazioni:

Tuttavia, se crei un cluster Dataproc che utilizza un account di servizio personalizzato, per abilitare la derivazione dei dati sul cluster, devi concedere un ruolo obbligatorio a tale account di servizio personalizzato, come spiegato nel paragrafo seguente.

Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Dataproc, chiedi all'amministratore di concederti i seguenti ruoli IAM sull'account di servizio personalizzato del cluster:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Abilitare la derivazione dei dati Spark a livello di progetto

Puoi abilitare la derivazione dei dati Spark a livello di progetto. I job Spark supportati eseguiti sui cluster creati dopo l'abilitazione della derivazione dei dati in un progetto avranno la derivazione dei dati abilitata. Tieni presente che i job eseguiti sui cluster esistenti, ovvero i cluster creati prima dell'abilitazione della derivazione dei dati a livello di progetto, non avranno la derivazione dei dati abilitata.

Abilitare la derivazione dei dati Spark a livello di progetto

Per abilitare la derivazione dei dati Spark a livello di progetto, imposta i seguenti metadati di progetto personalizzati:

Chiave Valore
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Puoi disabilitare la derivazione dei dati Spark a livello di progetto impostando i DATAPROC_LINEAGE_ENABLED metadati su false.

Abilitare la derivazione dei dati Spark a livello di cluster

Puoi abilitare la derivazione dei dati Spark quando crei un cluster in modo che tutti i job Spark supportati inviati al cluster abbiano la derivazione dei dati abilitata.

Abilitare la derivazione dei dati Spark a livello di cluster

Per abilitare la derivazione dei dati Spark su un cluster, crea un cluster Dataproc con la proprietà del cluster dataproc:dataproc.lineage.enabled impostata su true.

Cluster con versione dell'immagine 2.0: per la derivazione dei dati Spark è necessario l'ambito di accesso alla VM del cluster Dataproc cloud-platform scope. I cluster con versione dell'immagine Dataproc creati con la versione dell'immagine 2.1 e successive hanno cloud-platform abilitato. Se specifichi la versione dell'immagine Dataproc 2.0 quando crei un cluster, imposta l' ambito su cloud-platform.

Esempio dell'interfaccia a riga di comando gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Disabilitare la derivazione dei dati Spark su un job

Se abiliti la derivazione dei dati Spark a livello di cluster, puoi disabilitare la derivazione dei dati Spark su un job specifico passando la spark.extraListeners proprietà con un valore vuoto ("") quando invii il job.

Una volta abilitata, non puoi disabilitare la derivazione dei dati Spark sul cluster. Per eliminare la derivazione dei dati Spark su tutti i job del cluster, puoi ricreare il cluster senza la proprietà dataproc:dataproc.lineage.enabled.

Invia un job Spark

Quando invii un job Spark su un cluster Dataproc creato con la derivazione dei dati Spark abilitata, Dataproc acquisisce e segnala le informazioni sulla derivazione dei dati all' API Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Note:

  • L'aggiunta delle proprietà spark.openlineage.namespace e spark.openlineage.appName, utilizzate per identificare in modo univoco il job, è facoltativa. Se non aggiungi queste proprietà, Dataproc utilizza i seguenti valori predefiniti:
    • Valore predefinito per spark.openlineage.namespace: PROJECT_ID
    • Valore predefinito per spark.openlineage.appName: spark.app.name

Visualizzare la derivazione nel Catalogo universale Dataplex

Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella Google Cloud console o recuperarle dall'API Data Lineage sotto forma di dati JSON.

Esempio di codice PySpark:

Il seguente job PySpark legge i dati da una tabella BigQuery pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Assicurati di sostituire quanto segue:

  • BUCKET: il nome di un bucket Cloud Storage esistente.

  • PROJECT_ID, DATASET, e TABLE: inserisci l'ID progetto , il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere).

Puoi visualizzare il grafico di derivazione nell'interfaccia utente di Dataplex Universal Catalog.

Grafico di derivazione di esempio

Passaggi successivi