En este documento, se describe cómo habilitar el linaje de datos en las cargas de trabajo por lotes y las sesiones interactivas de Managed Service para Apache Spark a nivel del proyecto, la carga de trabajo por lotes o la sesión interactiva.
Descripción general
El linaje de datos es una función de Knowledge Catalog que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.
Las cargas de trabajo y las sesiones de Managed Service para Apache Spark capturan eventos de linaje y los publican en la API de Data Lineage de Knowledge Catalog. Managed Service para Apache Spark se integra con la API de Data Lineage a través de OpenLineage, con el complemento de OpenLineage para Spark.
Puedes acceder a la información del linaje a través de Knowledge Catalog, con gráficos de linaje y la API de Data Lineage. Para obtener más información, consulta Cómo ver gráficos de linaje en Knowledge Catalog.
Disponibilidad
El linaje de datos, que admite fuentes de datos de BigQuery y Cloud Storage, está disponible para las cargas de trabajo y las sesiones que se ejecutan con versiones de tiempo de ejecución compatibles de Managed Service para Apache Spark, con las siguientes excepciones y limitaciones:
- El linaje de datos no está disponible para las cargas de trabajo ni las sesiones de SparkR o Spark Streaming.
Antes de comenzar
En la página del selector de proyectos de la consola de Google Cloud , selecciona el proyecto que deseas usar para tus cargas de trabajo o sesiones de Managed Service para Apache Spark.
Habilita la API de Data Lineage.
Próximos cambios en el linaje de datos de Spark Consulta las Notas de versión de Managed Service para Apache Spark para conocer el anuncio de un cambio que hará que el linaje de datos de Spark esté disponible automáticamente para tus proyectos, cargas de trabajo por lotes y sesiones interactivas cuando habilites la API de Data Lineage (consulta Cómo controlar la transferencia del linaje para un servicio) sin necesidad de configuración adicional del proyecto, la carga de trabajo por lotes o la sesión interactiva.
Roles obligatorios
Si tu carga de trabajo por lotes usa la cuenta de servicio predeterminada de Managed Service para Apache Spark, tiene el rol Managed Service for Apache Spark Worker, que contiene los permisos que requiere el linaje de datos.
Sin embargo, si tu carga de trabajo por lotes usa una cuenta de servicio personalizada para habilitar el linaje de datos, debes otorgar uno de los roles que se indican en el siguiente párrafo, que contienen los permisos requeridos por el linaje de datos, a la cuenta de servicio personalizada.
Para obtener los permisos que necesitas para usar el linaje de datos con Managed Service para Apache Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu cuenta de servicio personalizada de la carga de trabajo por lotes:
-
Otorga uno de los siguientes roles:
- Worker de Managed Service para Apache Spark (
roles/dataproc.worker) - Editor de linaje de datos (
roles/datalineage.editor) - Productor de linaje de datos (
roles/datalineage.producer) - Administrador de linaje de datos (
roles/datalineage.admin)
- Worker de Managed Service para Apache Spark (
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.
Habilita el linaje de datos de Spark
Puedes habilitar el linaje de datos de Spark para tu proyecto, carga de trabajo por lotes o sesión interactiva.
Habilita el linaje de datos a nivel del proyecto
Después de habilitar el linaje de datos de Spark a nivel del proyecto, los trabajos de Spark posteriores que se ejecuten en una carga de trabajo por lotes o en una sesión interactiva tendrán habilitado el linaje de datos de Spark.
Para habilitar el linaje de datos de Spark en tu proyecto, establece los siguientes metadatos personalizados del proyecto:
| Clave | Valor |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
Para inhabilitar el linaje de datos de Spark en un proyecto, configura los metadatos DATAPROC_LINEAGE_ENABLED como false.
Habilita el linaje de datos en una carga de trabajo por lotes de Spark
Para habilitar el linaje de datos en una carga de trabajo por lotes, establece la propiedad spark.dataproc.lineage.enabled en true cuando envíes la carga de trabajo. Este parámetro de configuración anula cualquier parámetro de configuración del linaje de datos de Spark a nivel del proyecto: Si el linaje de datos de Spark está inhabilitado a nivel del proyecto, pero habilitado para la carga de trabajo por lotes, tendrá prioridad el parámetro de configuración de la carga de trabajo por lotes.
Puedes inhabilitar el linaje de datos de Spark en una carga de trabajo por lotes de Spark. Para ello, establece la propiedad spark.dataproc.lineage.enabled en false cuando envíes la carga de trabajo.
En este ejemplo, se usa gcloud CLI para enviar una carga de trabajo por lotes de lineage-example.py con el linaje de Spark habilitado.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
El siguiente código de lineage-example.py lee datos de una tabla pública de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos existente de BigQuery. Usa un bucket de Cloud Storage para el almacenamiento temporal.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Reemplaza lo siguiente:
- REGION: La región en la que se ejecutará la carga de trabajo
- BUCKET: Nombre de un bucket de Cloud Storage existente para almacenar dependencias
- PROJECT_ID, DATASET y TABLE: El ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir)
Puedes ver el gráfico de linaje en la IU de Knowledge Catalog.
Habilita el linaje de datos en una sesión interactiva de Spark o en una plantilla de sesión
Para habilitar el linaje de datos en una sesión interactiva o una plantilla de sesión de Spark, establece la propiedad spark.dataproc.lineage.enabled en true cuando crees la sesión o la plantilla de sesión. Este parámetro de configuración anula cualquier parámetro de configuración del linaje de datos de Spark a nivel del proyecto: Si el linaje de datos de Spark está inhabilitado a nivel del proyecto, pero habilitado para la sesión interactiva, el parámetro de configuración de la sesión interactiva tiene prioridad.
Puedes inhabilitar el linaje de datos de Spark en una sesión interactiva o una plantilla de sesión de Spark si configuras la propiedad spark.dataproc.lineage.enabled en false cuando creas la sesión interactiva o la plantilla de sesión.
El siguiente código de notebook de PySpark configura una sesión interactiva de Managed Service para Apache Spark con el linaje de datos de Spark habilitado. Luego, crea una sesión de Spark Connect que ejecuta una consulta de recuento de palabras en un conjunto de datos públicos de Shakespeare de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos existente de BigQuery (consulta Cómo crear una sesión de Spark en un notebook de BigQuery Studio) .
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Reemplaza lo siguiente:
- PROJECT_ID, DATASET y TABLE: El ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir)
Para ver el gráfico del linaje de datos, haz clic en el nombre de la tabla de destino que aparece en el panel de navegación de la página Explorador de BigQuery y, luego, selecciona la pestaña Linaje en el panel de detalles de la tabla.
Consultar el linaje en Knowledge Catalog
Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la consola de Google Cloud o recuperar la información de la API de Data Lineage como datos JSON.
¿Qué sigue?
- Obtén más información sobre el linaje de datos.
- Prueba el linaje de datos en un lab interactivo: Capture and Explore Data Updates With Data Lineage and OpenLineage in Dataplex.