Utilizzare la derivazione dei dati Spark

Questo documento descrive come attivare la derivazione dei dati per i job Spark di Managed Service for Apache Spark a livello di progetto o cluster.

La derivazione dei dati è una funzionalità di Knowledge Catalog che consente di monitorare il modo in cui i dati vengono trasferiti nei sistemi: da dove provengono, dove vengono inviati e a quali trasformazioni sono sottoposti.

La derivazione dei dati è disponibile per tutti i job Spark di Managed Service for Apache Spark, ad eccezione dei job SparkR e Spark streaming, e supporta le origini dati BigQuery e Cloud Storage. È incluso nelle versioni delle immagini di Managed Service for Apache Spark 2.0.74+, 2.1.22+, 2.2.50+, 2.3.1+ e 3.0.

Una volta abilitata la funzionalità nel cluster Managed Service for Apache Spark, i job Spark di Managed Service for Apache Spark acquisiscono gli eventi di derivazione dei dati e li pubblicano nell'API Data Lineage di Knowledge Catalog. Managed Service for Apache Spark si integra con l'API Data Lineage tramite OpenLineage, utilizzando il plug-in OpenLineage Spark.

Puoi accedere alle informazioni sulla derivazione dei dati tramite Knowledge Catalog utilizzando quanto segue:

Prima di iniziare

  1. Nella console Google Cloud , nella pagina di selezione del progetto, seleziona il progetto che contiene il cluster Managed Service for Apache Spark per cui vuoi monitorare la derivazione.

    Vai al selettore di progetti

  2. Abilita l'API Data Lineage.

    Abilita le API

    Modifiche imminenti alla lineage dei dati Spark Consulta le note di rilascio di Managed Service for Apache Spark per l'annuncio di una modifica che renderà automaticamente la lineage dei dati Spark disponibile per i tuoi progetti e cluster quando attivi l'API Data Lineage (vedi Controllare l'importazione della lineage per un servizio) senza richiedere impostazioni aggiuntive a livello di progetto o cluster.

Ruoli obbligatori

Se crei un cluster Managed Service for Apache Spark utilizzando il service account VM predefinito, questo ha il ruolo Managed Service for Apache Spark Worker, che attiva la derivazione dei dati. Non sono necessarie ulteriori azioni.

Tuttavia, se crei un cluster Managed Service for Apache Spark che utilizza un service account personalizzato, per attivare la lineage dei dati sul cluster, devi concedere un ruolo richiesto al account di servizio personalizzato, come spiegato nel paragrafo seguente.

Per ottenere le autorizzazioni necessarie per utilizzare la derivazione dei dati con Managed Service for Apache Spark, chiedi all'amministratore di concederti i seguenti ruoli IAM sull'account di servizio personalizzato del cluster:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Abilitare la derivazione dei dati Spark

Puoi abilitare la derivazione dei dati Spark a livello di progetto o cluster.

Abilita la derivazione dei dati Spark a livello di progetto

Dopo aver abilitato la derivazione dei dati Spark a livello di progetto, i successivi job Spark eseguiti sui cluster Managed Service for Apache Spark nel progetto avranno la derivazione dei dati Spark abilitata.

Per attivare la derivazione dei dati Spark a livello di progetto, imposta i seguenti metadati di progetto personalizzati:

Chiave Valore
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
L'impostazione di questo ambito di accesso VM è necessaria solo per i cluster con versione immagine 2.0. Viene impostato automaticamente sui cluster con versione immagine 2.1 e successive.

Puoi disattivare la derivazione dei dati Spark a livello di progetto impostando i metadati DATAPROC_LINEAGE_ENABLED su false.

Abilita la derivazione dei dati Spark a livello di cluster

Se abiliti la derivazione dei dati Spark quando crei un cluster, i job Spark supportati eseguiti sui cluster Managed Service for Apache Spark avranno la derivazione dei dati Spark abilitata. Questa impostazione sostituisce qualsiasi impostazione della derivazione dei dati Spark a livello di progetto: se la derivazione dei dati Spark è disattivata a livello di progetto, ma attivata a livello di cluster, il livello di cluster ha la precedenza e la derivazione dei dati sarà attivata per i job Spark supportati eseguiti sul cluster.

Per attivare la derivazione dei dati Spark su un cluster, crea un cluster Managed Service for Apache Spark con la proprietà del cluster dataproc:dataproc.lineage.enabled impostata su true.

Esempio di gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Puoi disattivare la derivazione dei dati Spark su un cluster impostando la proprietà dataproc:dataproc.lineage.enabled su false quando crei il cluster.

  • Disattivazione della derivazione dei dati su un cluster: per creare un cluster con la derivazione disattivata, imposta dataproc:dataproc.lineage.enabled=false. Dopo la creazione del cluster, non puoi disattivare la derivazione dei dati Spark sul cluster. Per disabilitare la derivazione dei dati Spark su un cluster esistente, puoi ricreare il cluster con la proprietà dataproc:dataproc.lineage.enabled impostata su false.

  • Imposta l'ambito sui cluster con versione immagine 2.0:cluster Managed Service for Apache Spark Accesso VM L'ambito cloud-platform è obbligatorio per la derivazione dei dati Spark. I cluster Managed Service for Apache Spark creati con la versione dell'immagine 2.1 e successive hanno cloud-platform abilitato. Se specifichi la versione dell'immagine Managed Service for Apache Spark 2.0 quando crei un cluster, imposta l'ambito su cloud-platform.

Disabilitare la derivazione dei dati Spark in un job

Se la derivazione dei dati Spark è abilitata su un cluster, puoi disabilitarla in un job passando la proprietà spark.extraListeners con un valore vuoto ("") quando invii il job.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

Invia un job Spark

Quando invii un job Spark supportato su un cluster Managed Service for Apache Spark creato con la derivazione dei dati Spark abilitata, Managed Service for Apache Spark acquisisce e segnala le informazioni sulla derivazione dei dati all'API Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Note:

  • L'aggiunta delle proprietà spark.openlineage.namespace e spark.openlineage.appName, che vengono utilizzate per identificare in modo univoco il job, è facoltativa. Se non aggiungi queste proprietà, Managed Service for Apache Spark utilizza i seguenti valori predefiniti:
    • Valore predefinito per spark.openlineage.namespace: PROJECT_ID
    • Valore predefinito per spark.openlineage.appName: spark.app.name

Visualizza la derivazione in Knowledge Catalog

Un grafico di derivazione mostra le relazioni tra le risorse del progetto e i processi che le hanno create. Puoi visualizzare le informazioni sulla derivazione dei dati nella console Google Cloud o recuperarle dall'API Data Lineage sotto forma di dati JSON.

Codice di esempio PySpark:

Il seguente job PySpark legge i dati da una tabella BigQuery pubblica e poi scrive l'output in una nuova tabella in un set di dati BigQuery esistente. Utilizza un bucket Cloud Storage per l'archiviazione temporanea.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Effettua le seguenti sostituzioni:

  • BUCKET: Il nome di un bucket Cloud Storage esistente

  • PROJECT_ID, DATASET e TABLE: l'ID progetto, il nome di un set di dati BigQuery esistente e il nome di una nuova tabella da creare nel set di dati (la tabella non deve esistere)

Puoi visualizzare il grafico della derivazione nella UI di Knowledge Catalog.

Grafico di derivazione di esempio

Passaggi successivi