Uso del linaje de datos de Spark

En este documento, se describe cómo habilitar el linaje de datos para tus trabajos de Managed Service para Apache Spark a nivel del proyecto o del clúster.

El linaje de datos es una función de Knowledge Catalog que te permite hacer un seguimiento de cómo los datos se mueven a través de tus sistemas: de dónde provienen, a dónde se pasan y qué transformaciones se aplican a ellos.

El linaje de datos está disponible para todos los trabajos de Managed Service para Apache Spark, excepto los trabajos de SparkR y Spark Streaming, y admite fuentes de datos de BigQuery y Cloud Storage. Se incluye con las versiones de imagen 2.0.74+, 2.1.22+, 2.2.50+, 2.3.1+ y 3.0+ de Managed Service para Apache Spark.

Una vez que habilites la función en tu clúster de Managed Service para Apache Spark, los trabajos de Spark de Managed Service para Apache Spark capturarán eventos de linaje de datos y los publicarán en la API de Data Lineage de Knowledge Catalog. Managed Service para Apache Spark se integra con la API de Data Lineage a través de OpenLineage, con el complemento de OpenLineage para Spark.

Puedes acceder a la información del linaje de datos a través de Knowledge Catalog con las siguientes opciones:

Antes de comenzar

  1. En la consola de Google Cloud , en la página del selector de proyectos, selecciona el proyecto que contiene el clúster de Managed Service para Apache Spark para el que deseas hacer un seguimiento del linaje.

    Ir al selector de proyectos

  2. Habilita la API de Data Lineage.

    Habilitar las API

    Próximos cambios en el linaje de datos de Spark Consulta las notas de la versión de Managed Service para Apache Spark para conocer el anuncio de un cambio que hará que el linaje de datos de Spark esté disponible automáticamente para tus proyectos y clústeres cuando habilites la API de Data Lineage (consulta Cómo controlar la transferencia del linaje para un servicio) sin necesidad de realizar ajustes adicionales a nivel del proyecto o del clúster.

Roles obligatorios

Si creas un clúster de Managed Service para Apache Spark con la cuenta de servicio de VM predeterminada, tendrá el rol Managed Service for Apache Spark Worker, que habilita el linaje de datos. No es necesario realizar ninguna acción adicional.

Sin embargo, si creas un clúster de Managed Service para Apache Spark que usa una cuenta de servicio personalizada, para habilitar el linaje de datos en el clúster, debes otorgar un rol requerido a la cuenta de servicio personalizada, como se explica en el siguiente párrafo.

Para obtener los permisos que necesitas para usar el linaje de datos con Managed Service para Apache Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM en la cuenta de servicio personalizada de tu clúster:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios a través de roles personalizados o cualquier otro rol predefinido.

Habilita el linaje de datos de Spark

Puedes habilitar el linaje de datos de Spark a nivel del proyecto o del clúster.

Habilita el linaje de datos de Spark a nivel del proyecto

Después de habilitar el linaje de datos de Spark a nivel del proyecto, los trabajos de Spark posteriores que se ejecuten en clústeres de Managed Service para Apache Spark en el proyecto tendrán habilitado el linaje de datos de Spark.

Para habilitar el linaje de datos de Spark a nivel del proyecto, establece los siguientes metadatos personalizados del proyecto:

Clave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
Establecer este permiso de acceso a la VM solo es necesario para los clústeres con la versión de imagen 2.0. Se establece automáticamente en los clústeres con la versión de imagen 2.1 y versiones posteriores.

Puedes inhabilitar el linaje de datos de Spark a nivel del proyecto si configuras los metadatos DATAPROC_LINEAGE_ENABLED como false.

Habilita el linaje de datos de Spark a nivel del clúster

Si habilitas el linaje de datos de Spark cuando creas un clúster, los trabajos de Spark compatibles que se ejecuten en clústeres de Managed Service para Apache Spark tendrán habilitado el linaje de datos de Spark. Este parámetro de configuración anula cualquier parámetro de configuración del linaje de datos de Spark a nivel del proyecto: Si el linaje de datos de Spark está inhabilitado a nivel del proyecto, pero habilitado a nivel del clúster, el nivel del clúster tiene prioridad y los trabajos de Spark compatibles que se ejecutan en el clúster tendrán habilitado el linaje de datos.

Para habilitar el linaje de datos de Spark en un clúster, crea un clúster de Managed Service para Apache Spark con la propiedad del clúster dataproc:dataproc.lineage.enabled establecida en true.

Ejemplo de gcloud CLI:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

Puedes inhabilitar el linaje de datos de Spark en un clúster configurando la propiedad dataproc:dataproc.lineage.enabled en false cuando crees el clúster.

  • Inhabilita el linaje de datos en un clúster: Para crear un clúster con el linaje inhabilitado, establece dataproc:dataproc.lineage.enabled=false. Después de crear el clúster, no puedes inhabilitar el linaje de datos de Spark en el clúster. Para inhabilitar el linaje de datos de Spark en un clúster existente, puedes volver a crear el clúster con la propiedad dataproc:dataproc.lineage.enabled establecida en false.

  • Establece el alcance en clústeres con la versión de imagen 2.0: Se requiere el alcance cloud-platform para el linaje de datos de Spark. Los clústeres de la versión de imagen de Managed Service para Apache Spark creados con la versión de imagen 2.1 y versiones posteriores tienen habilitado cloud-platform. Si especificas la versión de imagen 2.0 de Managed Service para Apache Spark cuando creas un clúster, establece el alcance en cloud-platform.

Inhabilita el linaje de datos de Spark en un trabajo

Si el linaje de datos de Spark está habilitado en un clúster, puedes inhabilitarlo en un trabajo pasando la propiedad spark.extraListeners con un valor vacío ("") cuando envíes el trabajo.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

Enviar un trabajo de Spark

Cuando envías un trabajo de Spark compatible en un clúster de Managed Service para Apache Spark que se creó con el linaje de datos de Spark habilitado, Managed Service para Apache Spark captura y registra la información del linaje de datos en la API de Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Notas:

  • Es opcional agregar las propiedades spark.openlineage.namespace y spark.openlineage.appName, que se usan para identificar el trabajo de forma única. Si no agregas estas propiedades, Managed Service para Apache Spark usará los siguientes valores predeterminados:
    • Valor predeterminado para spark.openlineage.namespace: PROJECT_ID
    • Valor predeterminado para spark.openlineage.appName: spark.app.name

Consultar el linaje en Knowledge Catalog

Un gráfico de linaje muestra las relaciones entre los recursos de tu proyecto y los procesos que los crearon. Puedes ver la información del linaje de datos en la consola de Google Cloud o recuperarla de la API de Data Lineage en forma de datos JSON.

Ejemplo de código de PySpark:

El siguiente trabajo de PySpark lee datos de una tabla pública de BigQuery y, luego, escribe el resultado en una tabla nueva en un conjunto de datos existente de BigQuery. Usa un bucket de Cloud Storage para el almacenamiento temporal.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Realiza los siguientes reemplazos:

  • BUCKET: Es el nombre de un bucket de Cloud Storage existente.

  • PROJECT_ID, DATASET y TABLE: El ID del proyecto, el nombre de un conjunto de datos de BigQuery existente y el nombre de una tabla nueva que se creará en el conjunto de datos (la tabla no debe existir)

Puedes ver el gráfico de linaje en la IU de Knowledge Catalog.

Gráfico de linaje de muestra

¿Qué sigue?