Usar linhagem de dados com o Serverless para Apache Spark

Este documento descreve como ativar a linhagem de dados emGoogle Cloud cargas de trabalho em lote e sessões interativas do Serverless para Apache Spark no nível do projeto, da carga de trabalho em lote ou da sessão interativa.

Visão geral

A linhagem de dados é um recurso do Dataplex Universal Catalog que permite acompanhar como os dados se movimentam nos sistemas: origem, destino e quais transformações são aplicadas a eles.

Google Cloud As cargas de trabalho e sessões do Serverless para Apache Spark capturam eventos de linhagem e os publicam na API Data Lineage do Dataplex Universal Catalog . O Serverless para Apache Spark se integra à API Data Lineage usando o OpenLineage (link em inglês) e o plug-in OpenLineage Spark (link em inglês).

É possível acessar informações de linhagem pelo Dataplex Universal Catalog usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte Visualizar gráficos de linhagem no Dataplex Universal Catalog.

Disponibilidade

A linhagem de dados, que oferece suporte a fontes de dados do BigQuery e do Cloud Storage, está disponível para cargas de trabalho e sessões executadas com versões de ambiente de execução sem servidor para Apache Spark compatíveis, com as seguintes exceções e limitações:

  • A linhagem de dados não está disponível para cargas de trabalho ou sessões do SparkR ou do Spark Streaming.

Antes de começar

  1. Na página do seletor de projetos no console Google Cloud , selecione o projeto a ser usado nas cargas de trabalho ou sessões do Serverless para Apache Spark.

    Acessar o seletor de projetos

  2. Ative a API Data Lineage.

    Ativar as APIs

    Próximas mudanças na linhagem de dados do Spark: consulte as notas da versão do Serverless para Apache Spark para saber mais sobre uma mudança que vai disponibilizar automaticamente a linhagem de dados do Spark para seus projetos, cargas de trabalho em lote e sessões interativas quando você ativar a API Data Lineage (consulte Controlar a ingestão de linhagem para um serviço) sem exigir configurações adicionais de projeto, carga de trabalho em lote ou sessão interativa.

Funções exigidas

Se a carga de trabalho em lote usar a conta de serviço padrão do Serverless para Apache Spark, ela terá o papel Dataproc Worker, que contém as permissões necessárias para a linhagem de dados.

No entanto, se sua carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, conceda um dos papéis listados no parágrafo a seguir, que contêm as permissões necessárias para a linhagem de dados, à conta de serviço personalizada.

Para receber as permissões necessárias para usar a linhagem de dados com o Dataproc, peça ao administrador para conceder a você os seguintes papéis do IAM na conta de serviço personalizada da carga de trabalho em lote:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.

Ativar a linhagem de dados do Spark

É possível ativar a linhagem de dados do Spark para seu projeto, carga de trabalho em lote ou sessão interativa.

Ativar a linhagem de dados no nível do projeto

Depois de ativar a linhagem de dados do Spark no nível do projeto, os jobs do Spark subsequentes que forem executados em uma carga de trabalho em lote ou em uma sessão interativa terão a linhagem de dados do Spark ativada.

Para ativar a linhagem de dados do Spark no seu projeto, defina os seguintes metadados personalizados do projeto:

Chave Valor
DATAPROC_LINEAGE_ENABLED true

Para desativar a linhagem de dados do Spark em um projeto, defina os metadados DATAPROC_LINEAGE_ENABLED como false.

Ativar a linhagem de dados em uma carga de trabalho em lote do Spark

Para ativar a linhagem de dados em uma carga de trabalho em lote, defina a propriedade spark.dataproc.lineage.enabled como true ao enviar a carga de trabalho. Essa configuração substitui qualquer configuração de linhagem de dados do Spark no nível do projeto: se a linhagem de dados do Spark estiver desativada no nível do projeto, mas ativada para a carga de trabalho em lote, a configuração da carga de trabalho em lote terá precedência.

É possível desativar a linhagem de dados do Spark em uma carga de trabalho em lote do Spark definindo a propriedade spark.dataproc.lineage.enabled como false ao enviar a carga de trabalho.

Este exemplo usa a CLI gcloud para enviar uma carga de trabalho lineage-example.py em lote com a linhagem do Spark ativada.

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

O código lineage-example.py a seguir lê dados de uma tabela pública do BigQuery e grava a saída em uma nova tabela em um conjunto de dados do BigQuery. Ele usa um bucket do Cloud Storage para armazenamento temporário.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

Substitua:

  • REGION: a região para executar a carga de trabalho.
  • BUCKET: o nome de um bucket do Cloud Storage para armazenar dependências
  • PROJECT_ID, DATASET e TABLE: o ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).

É possível conferir o gráfico de linhagem na interface do Dataplex Universal Catalog.

Gráfico de linhagem do Spark

Ativar a linhagem de dados em uma sessão interativa ou modelo de sessão do Spark

Para ativar a linhagem de dados em uma sessão interativa ou modelo de sessão do Spark, defina a propriedade spark.dataproc.lineage.enabled como true ao criar a sessão ou o modelo de sessão. Essa configuração substitui qualquer configuração de linhagem de dados do Spark no nível do projeto: se a linhagem de dados do Spark estiver desativada no nível do projeto, mas ativada para a sessão interativa, a configuração da sessão interativa terá precedência.

É possível desativar a linhagem de dados do Spark em uma sessão interativa ou um modelo de sessão do Spark definindo a propriedade spark.dataproc.lineage.enabled como false ao criar a sessão interativa ou o modelo de sessão.

O código do notebook PySpark a seguir configura uma sessão interativa do Serverless para Apache Spark com a linhagem de dados do Spark ativada. Em seguida, ele cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras em um conjunto de dados público de Shakespeare do BigQuery e grava a saída em uma nova tabela em um conjunto de dados do BigQuery existente. Consulte Criar uma sessão do Spark em um notebook do BigQuery Studio.

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Substitua:

  • PROJECT_ID, DATASET e TABLE: o ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).

Para ver o gráfico de linhagem de dados, clique no nome da tabela de destino listado no painel de navegação da página Explorer do BigQuery e selecione a guia "Linhagem" no painel de detalhes da tabela.

Gráfico de linhagem do Spark

Visualizar a linhagem no catálogo universal do Dataplex

Um gráfico de linhagem mostra as relações entre os recursos do projeto e os processos que os criaram. É possível ver informações de linhagem de dados no console do Google Cloud ou recuperar as informações da API Data Lineage como dados JSON.

A seguir