Este documento descreve como ativar a linhagem de dados em Google Cloud cargas de trabalho em lote e sessões interativas do Serverless para Apache Spark no nível do projeto, da carga de trabalho em lote ou da sessão interativa.
Visão geral
A linhagem de dados é um recurso do Dataplex Universal Catalog que permite acompanhar como os dados são transmitidos pelos sistemas: origem, destino e quais transformações são aplicadas a eles.
Google Cloud As cargas de trabalho e sessões do Serverless para Apache Spark capturam eventos de linhagem e os publicam em o Dataplex Universal Catalog API Data Lineage. O Serverless para Apache Spark é integrado à API Data Lineage pelo OpenLineage, usando o plug-in OpenLineage Spark.
É possível acessar informações de linhagem pelo Dataplex Universal Catalog, usando gráficos de linhagem e a API Data Lineage. Para mais informações, consulte Visualizar gráficos de linhagem no Dataplex Universal Catalog.
Disponibilidade, recursos e limitações
A linhagem de dados, que oferece suporte a fontes de dados do BigQuery e do Cloud Storage
, está disponível para cargas de trabalho e sessões executadas com
as versões de ambiente de execução do Serverless para Apache Spark 1.2, 2.2, 2.3 e 3.0,
com as seguintes exceções e limitações:
- A linhagem de dados não está disponível para cargas de trabalho ou sessões de streaming do SparkR ou do Spark.
Antes de começar
Na página do seletor de projetos no Google Cloud console, selecione o projeto a ser usado para as cargas de trabalho ou sessões do Serverless para Apache Spark.
Ative a API Data Lineage.
Funções exigidas
Se a carga de trabalho em lote usar a
conta de serviço padrão do Serverless para Apache Spark,
ela terá o papel Dataproc Worker, que ativa a linhagem de dados. Não é necessária nenhuma ação adicional.
No entanto, se a carga de trabalho em lote usar uma conta de serviço personalizada para ativar a linhagem de dados, você precisará conceder um papel obrigatório à conta de serviço personalizada, conforme explicado no parágrafo a seguir.
Para receber as permissões necessárias para usar a linhagem de dados com o Dataproc, peça ao administrador para conceder a você os seguintes papéis do IAM na conta de serviço personalizada da carga de trabalho em lote:
-
Conceda um dos seguintes papéis:
-
Worker do Dataproc (
roles/dataproc.worker) -
Editor de linhagem de dados (
roles/datalineage.editor) -
Produtor de linhagem de dados (
roles/datalineage.producer) -
Administrador de linhagem de dados (
roles/datalineage.admin)
-
Worker do Dataproc (
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando personalizados papéis ou outros predefinidos papéis.
Ativar a linhagem de dados no nível do projeto
É possível ativar a linhagem de dados no nível do projeto. Quando ativada no nível do projeto, todas as cargas de trabalho em lote e sessões interativas subsequentes executadas no projeto terão a linhagem do Spark ativada.
Como ativar a linhagem de dados no nível do projeto
Para ativar a linhagem de dados no nível do projeto, defina os seguintes metadados personalizados do projeto.
| Chave | Valor |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
É possível desativar a linhagem de dados no nível do projeto definindo os
DATAPROC_LINEAGE_ENABLED metadados como false.
Ativar a linhagem de dados para uma carga de trabalho em lote do Spark
É possível ativar a linhagem de dados em uma carga de trabalho em lote
definindo a propriedade spark.dataproc.lineage.enabled como true ao
enviar a carga de trabalho.
Exemplo de carga de trabalho em lote
Este exemplo envia uma carga de trabalho lineage-example.py em lote com a linhagem do Spark
ativada.
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py lê dados de uma tabela pública do BigQuery
e grava a saída em uma nova tabela em um conjunto de dados do BigQuery. Ele usa um bucket do Cloud Storage para armazenamento temporário.
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
Faça as seguintes substituições:
REGION: selecione uma região para executar a carga de trabalho.
BUCKET: o nome de um bucket do Cloud Storage para armazenar dependências.
PROJECT_ID, DATASET, e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).
É possível visualizar o gráfico de linhagem na interface do Dataplex Universal Catalog.
Ativar a linhagem de dados para uma sessão interativa do Spark
É possível ativar a linhagem de dados em uma sessão interativa do Spark
definindo a propriedade spark.dataproc.lineage.enabled como true ao
criar a sessão ou o modelo de sessão.
Exemplo de sessão interativa
O código do notebook PySpark a seguir configura uma sessão interativa do Serverless para Apache Spark com a linhagem de dados do Spark ativada. Em seguida, ele cria uma sessão do Spark Connect que executa uma consulta de contagem de palavras em um conjunto de dados público do BigQuery Shakespeare e grava a saída em uma nova tabela em um conjunto de dados do BigQuery.
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
Faça as seguintes substituições:
- PROJECT_ID, DATASET, e TABLE: insira o ID do projeto, o nome de um conjunto de dados do BigQuery e o nome de uma nova tabela a ser criada no conjunto de dados (a tabela não pode existir).
É possível visualizar o gráfico de linhagem de dados clicando no nome da tabela de destino listado no painel de navegação na página Explorer do BigQuery, e selecionando a guia "Linhagem" no painel de detalhes da tabela.
Visualizar a linhagem no catálogo universal do Dataplex
Um gráfico de linhagem mostra as relações entre os recursos do projeto e os processos que os criaram. É possível visualizar informações de linhagem de dados no Google Cloud console ou recuperar as informações da API Data Lineage como dados JSON.
A seguir
- Saiba mais sobre a linhagem de dados.
- Teste em um laboratório interativo: Capturar e analisar atualizações de dados com linhagem de dados e OpenLineage