将数据沿袭与 Managed Service for Apache Spark 搭配使用

本文档介绍了如何在 项目批处理工作负载交互式会话级层为 Managed Service for Apache Spark 批处理工作负载和交互式会话启用 数据沿袭

概览

数据沿袭是 Dataplex Universal Catalog 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处 、传递到何处以及对其应用了哪些转换。

Managed Service for Apache Spark 工作负载和会话会捕获沿袭事件并将其发布到 Dataplex Universal Catalog Data Lineage API。 Managed Service for Apache Spark 使用 OpenLineage Spark plugin通过 OpenLineage与 Data Lineage API 集成。

您可以通过 Dataplex Universal Catalog 使用 沿袭图Data Lineage API 访问沿袭信息。 如需了解详情,请参阅在 Dataplex Universal Catalog 中查看沿袭图

可用性

数据沿袭支持 BigQuery 和 Cloud Storage 数据源,适用于使用 受支持的 Managed Service for Apache Spark 运行时版本 运行的工作负载和会话,但存在以下例外情况和限制:

  • 数据沿袭不适用于 SparkR 或 Spark 流式工作负载或会话。

准备工作

  1. 在 Google Cloud 控制台的项目选择器页面中,选择要用于 Managed Service for Apache Spark 工作负载或会话的项目 。

    转到“项目选择器”

  2. 启用 Data Lineage API。

    启用 API

    即将进行的 Spark 数据沿袭更改:如需了解相关公告,请参阅 Managed Service for Apache Spark 版本说明 。该公告将说明,当您启用 Data Lineage API(请参阅 控制服务的沿袭注入)时,系统将自动为您的项目、批处理工作负载和交互式会话提供 Spark 数据沿袭,而无需额外的项目、批处理工作负载或交互式会话设置。

所需的角色

如果您的批处理工作负载使用 默认的 Managed Service for Apache Spark 服务账号, 则该账号具有 Managed Service for Apache Spark Worker 角色,其中包含数据沿袭所需的权限。

但是,如果您的批处理工作负载使用自定义服务帐号来启用数据沿袭,则必须向该自定义服务帐号授予下段中列出的一个 角色,这些角色包含数据沿袭所需的权限。

如需获得将数据沿袭与 Managed Service for Apache Spark 搭配使用所需的权限,请让您的管理员为您授予批处理工作负载自定义服务帐号的以下 IAM 角色:

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您也可以通过自定义 角色或其他预定义 角色来获取所需的权限。

启用 Spark 数据沿袭

您可以为项目、批处理工作负载或交互式会话启用 Spark 数据沿袭。

在项目级层启用数据沿袭

在项目级层启用 Spark 数据沿袭后,在批处理工作负载或交互式会话中运行的后续 Spark 作业将启用 Spark 数据沿袭。

如需在项目上启用 Spark 数据沿袭, 请设置以下自定义项目元数据

DATAPROC_LINEAGE_ENABLED true

您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 来停用项目的 Spark 数据沿袭。

在 Spark 批处理工作负载中启用数据沿袭

如需在批处理工作负载中启用数据沿袭,请在提交工作负载时将 spark.dataproc.lineage.enabled 属性设置为 true。此设置会覆盖项目级层的任何 Spark 数据沿袭 设置:如果在 项目级层停用了 Spark 数据沿袭,但为批处理工作负载启用了该沿袭,则以批处理工作负载设置为准。

您可以通过在提交工作负载时将 spark.dataproc.lineage.enabled 属性设置为 false 来停用 Spark 批处理工作负载中的 Spark 数据沿袭。

此示例使用 gcloud CLI 提交启用了 Spark 沿袭的批处理 lineage-example.py 工作负载。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

以下 lineage-example.py 代码从公共 BigQuery 表中读取数据,然后将输出写入现有 BigQuery 数据集中的新表。它使用 Cloud Storage 存储桶进行临时存储。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

替换以下内容:

  • REGION:运行工作负载的区域
  • BUCKET:用于存储依赖项的现有 Cloud Storage 存储桶的名称
  • PROJECT_IDDATASETTABLE: 项目 ID、现有 BigQuery 数据集的名称,以及 要在该数据集中创建的新表的名称(该表不得存在)

您可以在 Dataplex Universal Catalog 界面中查看沿袭图。

Spark 沿袭图

在 Spark 交互式会话或会话模板中启用数据沿袭

如需在 Spark 交互式会话或会话模板中启用数据沿袭, 请在创建会话或会话模板时将 spark.dataproc.lineage.enabled 属性设置为 true此设置会覆盖项目级层的任何 Spark 数据沿袭 设置:如果在 项目级层停用了 Spark 数据沿袭,但为交互式会话启用了该沿袭, 则以交互式会话设置为准。

您可以通过在创建交互式会话或会话模板时将 spark.dataproc.lineage.enabled 属性设置为 false 来停用 Spark 交互式会话或会话模板中的 Spark 数据沿袭。

以下 PySpark 笔记本代码配置了启用了 Spark 数据沿袭的 Managed Service for Apache Spark 交互式会话。然后,它会创建一个 Spark Connect 会话,该会话对公共 BigQuery Shakespeare 数据集运行字数统计查询,然后将输出写入现有 BigQuery 数据集中的新表(请参阅 在 BigQuery Studio 笔记本中创建 Spark 会话)。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

替换以下内容:

  • PROJECT_IDDATASETTABLE: 项目 ID、现有 BigQuery 数据集的名称,以及 要在该数据集中创建的新表的名称(该表不得存在)

您可以点击 BigQuery 探索器 页面导航窗格中列出的目标表名称,然后在表详细信息窗格中选择沿袭标签页,以查看数据沿袭图。

Spark 沿袭图

在 Dataplex Universal Catalog 中查看沿袭

沿袭图显示了项目资源与创建这些资源的进程之间的关系。您可以在 控制台 中查看数据沿袭信息,也可以从 Google Cloud Data Lineage API 中以 JSON 数据的形式检索该信息。

后续步骤