Como usar a linhagem de dados do Spark

Este documento descreve como ativar a linhagem de dados para seus jobs do Serviço Gerenciado para Apache Spark no nível do projeto ou do cluster.

A linhagem de dados é um recurso do Knowledge Catalog que permite acompanhar como os dados se movimentam nos sistemas: de onde vêm, para onde vão e quais transformações são aplicadas a eles.

A linhagem de dados está disponível para todos os jobs do Serviço Gerenciado para Apache Spark, exceto os jobs de streaming do SparkR e do Spark, e oferece suporte a fontes de dados do BigQuery e do Cloud Storage. Ela está incluída nas versões de imagem 2.0.74+, 2.1.22+, 2.2.50+, 2.3.1+ e 3.0 do Serviço Gerenciado para Apache Spark.

Depois de ativar o recurso no cluster do Serviço Gerenciado para Apache Spark, os jobs do Spark do Serviço Gerenciado para Apache Spark capturam eventos de linhagem de dados e os publicam na API Data Lineage do Knowledge Catalog. O Serviço Gerenciado para Apache Spark se integra à API Data Lineage pelo OpenLineage, usando o plug-in OpenLineage Spark.

É possível acessar informações de linhagem de dados pelo Knowledge Catalog, usando o seguinte:

Antes de começar

  1. No Google Cloud console do, na página do seletor de projetos, selecione o projeto que contém o cluster do Serviço Gerenciado para Apache Spark para o qual você quer rastrear a linhagem.

    Acessar o seletor de projetos

  2. Ative a API Data Lineage.

    Ativar as APIs

    Próximas mudanças na linhagem de dados do Spark Consulte as notas de lançamento do Serviço Gerenciado para Apache Spark para conferir o anúncio de uma mudança que disponibilizará automaticamente a linhagem de dados do Spark para seus projetos e clusters quando você ativar a API Data Lineage (consulte Controlar a ingestão de linhagem de um serviço) sem exigir configurações adicionais de projeto ou cluster.

Funções exigidas

Se você criar um cluster do Serviço Gerenciado para Apache Spark usando a conta de serviço da VM padrão, ele terá o Managed Service for Apache Spark Worker papel, que ativa a linhagem de dados. Nenhuma outra ação é necessária.

No entanto, se você criar um cluster do Serviço Gerenciado para Apache Spark que usa uma conta de serviço personalizada, para ativar a linhagem de dados no cluster, conceda um papel necessário a conta de serviço personalizada, conforme explicado no parágrafo a seguir.

Para receber as permissões necessárias para usar a linhagem de dados com o Serviço Gerenciado para Apache Spark, peça ao administrador para conceder a você os seguintes papéis do IAM na conta de serviço personalizada do cluster:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando personalizados papéis ou outros predefinidos papéis.

Ativar a linhagem de dados do Spark

É possível ativar a linhagem de dados do Spark no nível do projeto ou do cluster.

Ativar a linhagem de dados do Spark no nível do projeto

Depois de ativar a linhagem de dados do Spark no nível do projeto, os jobs do Spark subsequentes executados em clusters do Serviço Gerenciado para Apache Spark no projeto terão a linhagem de dados do Spark ativada.

Para ativar a linhagem de dados do Spark no nível do projeto, defina os seguintes metadados personalizados do projeto:

Chave Valor
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
A definição desse escopo de acesso à VM só é necessária para clusters da versão de imagem 2.0. Ele é definido automaticamente em clusters da versão de imagem 2.1 e mais recentes.

É possível desativar a linhagem de dados do Spark no nível do projeto definindo os metadados DATAPROC_LINEAGE_ENABLED como false.

Ativar a linhagem de dados do Spark no nível do cluster

Se você ativar a linhagem de dados do Spark ao criar um cluster, os jobs do Spark com suporte executados em clusters do Serviço Gerenciado para Apache Spark terão a linhagem de dados do Spark ativada. Essa configuração substitui qualquer configuração de linhagem de dados do Spark no nível do projeto: se a linhagem de dados do Spark estiver desativada no nível do projeto, mas ativada no nível do cluster, o nível do cluster terá precedência, e os jobs do Spark com suporte executados no cluster terão a linhagem de dados ativada.

Para ativar a linhagem de dados do Spark em um cluster, crie um cluster do Serviço Gerenciado para Apache Spark com a propriedade de cluster dataproc:dataproc.lineage.enabled definida como true.

Exemplo da CLI gcloud:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

É possível desativar a linhagem de dados do Spark em um cluster definindo a propriedade dataproc:dataproc.lineage.enabled como false ao criar o cluster.

  • Desativar a linhagem de dados em um cluster: para criar um cluster com a linhagem desativada, defina dataproc:dataproc.lineage.enabled=false. Depois da criação do cluster, não é possível desativar a linhagem de dados do Spark no cluster. Para desativar a linhagem de dados do Spark em um cluster atual, você pode recriar o cluster com a propriedade dataproc:dataproc.lineage.enabled definida como false.

  • Definir o escopo em clusters da versão de imagem 2.0: o escopo de acesso à VM do cluster do Serviço Gerenciado para Apache Spark cloud-platform é necessário para a linhagem de dados do Spark. Os clusters da versão de imagem do Serviço Gerenciado para Apache Spark criados com a versão de imagem 2.1 e mais recentes têm cloud-platform ativado. Se você especificar a versão de imagem 2.0 do Serviço Gerenciado para Apache Spark ao criar um cluster, defina o escopo como cloud-platform.

Desativar a linhagem de dados do Spark em um job

Se a linhagem de dados do Spark estiver ativada em um cluster, é possível desativá-la em um job transmitindo a propriedade spark.extraListeners com um valor vazio ("") ao enviar o job.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

Enviar um job do Spark

Ao enviar um job do Spark com suporte em um cluster do Serviço Gerenciado para Apache Spark criado com a linhagem de dados do Spark ativada, o Serviço Gerenciado para Apache Spark captura e informa as informações de linhagem de dados à API Data Lineage.

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

Observações:

  • A adição das propriedades spark.openlineage.namespace e spark.openlineage.appName, que são usadas para identificar o job de maneira exclusiva, é opcional. Se você não adicionar essas propriedades, o Serviço Gerenciado para Apache Spark usará os seguintes valores padrão:
    • Valor padrão para spark.openlineage.namespace: PROJECT_ID
    • Valor padrão para spark.openlineage.appName: spark.app.name

Ver a linhagem no Knowledge Catalog

Um gráfico de linhagem mostra as relações entre os recursos do projeto e os processos que os criaram. É possível visualizar informações de linhagem de dados no Google Cloud console, ou recuperá-las da API Data Lineage na forma de dados JSON.

Exemplo de código do PySpark:

O job do PySpark a seguir lê dados de uma tabela pública do BigQuery e grava a saída em uma nova tabela em um conjunto de dados do BigQuery. Ele usa um bucket do Cloud Storage para armazenamento temporário.

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

Faça as seguintes substituições:

  • BUCKET: o nome de um bucket do Cloud Storage

  • PROJECT_ID, DATASET, e TABLE: o ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir)

É possível visualizar o gráfico de linhagem na interface do Knowledge Catalog.

Exemplo de gráfico de linhagem

A seguir