Usa el linaje de datos con Serverless para Apache Spark

En este documento, se describe cómo habilitar el linaje de datos en Google Cloud las cargas de trabajo por lotes y las sesiones interactivas de Serverless for Apache Spark a nivel del proyecto, la carga de trabajo por lotes o la sesión interactiva.

Descripción general

El linaje de datos es una función de Dataplex Universal Catalog que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.

Google Cloud Las cargas de trabajo y las sesiones de Serverless for Apache Spark capturan eventos de linaje y los publican en el Dataplex Universal Catalog API de Data Lineage. Serverless for Apache Spark se integra con la API de Data Lineage a través de OpenLineage, mediante el complemento OpenLineage Spark.

Puedes acceder a la información del linaje a través de Dataplex Universal Catalog, con gráficos de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Dataplex Universal Catalog.

Disponibilidad, capacidades y limitaciones

El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con las versiones de entorno de ejecución de Serverless for Apache Spark 1.2, 2.2, 2.3, y 3.0, con las siguientes excepciones y limitaciones:

  • El linaje de datos no está disponible para las cargas de trabajo ni las sesiones de SparkR o Spark Streaming.

Antes de comenzar

  1. En la página del selector de proyectos de la Google Cloud console, selecciona el proyecto que se usará para tus cargas de trabajo o sesiones de Serverless for Apache Spark.

    Ir al selector de proyectos

  2. Habilita la API de Data Lineage.

    Habilitar las API

Roles obligatorios

Si tu carga de trabajo por lotes usa la cuenta de servicio predeterminada de Serverless for Apache Spark, tiene el rol Dataproc Worker, que habilita el linaje de datos. No es necesario realizar ninguna otra acción.

Sin embargo, si tu carga de trabajo por lotes usa una cuenta de servicio personalizada para habilitar el linaje de datos, debes otorgar un rol obligatorio a la cuenta de servicio personalizada como se explica en el siguiente párrafo.

Para obtener los permisos que necesitas para usar el linaje de datos con Dataproc, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu cuenta de servicio personalizada de carga de trabajo por lotes:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Habilita el linaje de datos a nivel del proyecto

Puedes habilitar el linaje de datos a nivel del proyecto. Cuando se habilita a nivel del proyecto, todas las cargas de trabajo por lotes y las sesiones interactivas posteriores que ejecutes en el proyecto tendrán habilitado el linaje de Spark.

Cómo habilitar el linaje de datos a nivel del proyecto

Para habilitar el linaje de datos a nivel del proyecto, configura los siguientes metadatos personalizados del proyecto.

Clave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

Para inhabilitar el linaje de datos a nivel del proyecto, configura los DATAPROC_LINEAGE_ENABLED metadatos en false.

Habilita el linaje de datos para una carga de trabajo por lotes de Spark

Puedes habilitar el linaje de datos en una carga de trabajo por lotes si configuras la propiedad spark.dataproc.lineage.enabled en true cuando envías la carga de trabajo.

Ejemplo de carga de trabajo por lotes

En este ejemplo, se envía una carga de trabajo por lotes lineage-example.py con el linaje de Spark habilitado.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py lee datos de una tabla pública de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos de BigQuery existente. Usa un bucket de Cloud Storage para el almacenamiento temporal.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Realiza los siguientes reemplazos:

  • REGION: Selecciona una región para ejecutar tu carga de trabajo.

  • BUCKET: Es el nombre de un bucket de Cloud Storage existente para almacenar dependencias.

  • PROJECT_ID, DATASET, y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva para crear en el conjunto de datos (la tabla no debe existir).

Puedes ver el gráfico de linaje en la IU de Dataplex Universal Catalog.

Gráfico de linaje de Spark

Habilita el linaje de datos para una sesión interactiva de Spark

Puedes habilitar el linaje de datos en una sesión interactiva de Spark si configuras la propiedad spark.dataproc.lineage.enabled en true cuando creas la sesión o la plantilla de sesión.

Ejemplo de sesión interactiva

El siguiente código de notebook de PySpark configura una sesión interactiva de Serverless for Apache Spark con el linaje de datos de Spark habilitado. Luego, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos público de Shakespeare de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos de BigQuery existente.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Realiza los siguientes reemplazos:

  • PROJECT_ID, DATASET, y TABLE: Inserta tu ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva para crear en el conjunto de datos (la tabla no debe existir).

Para ver el gráfico de linaje de datos, haz clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, luego, selecciona la pestaña de linaje en el panel de detalles de la tabla.

Gráfico de linaje de Spark

Consulta el linaje en Dataplex Universal Catalog

Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la Google Cloud console o recuperar la información de la API de Data Lineage como datos JSON.

¿Qué sigue?