本文档介绍了如何在项目、批量工作负载或交互式会话级层为Google Cloud Serverless for Apache Spark 批量工作负载和交互式会话启用数据沿袭。
概览
数据沿袭是 Dataplex Universal Catalog 的一项功能,可让您跟踪数据在系统中的移动方式:数据来自何处、传递到何处以及对其应用了哪些转换。
Google Cloud Serverless for Apache Spark 工作负载和会话会捕获沿袭事件并将其发布到 Dataplex Universal Catalog Data Lineage API。Serverless for Apache Spark 使用 OpenLineage Spark 插件通过 OpenLineage 与 Data Lineage API 集成。
您可以通过 Dataplex Universal Catalog 使用沿袭图和 Data Lineage API 访问沿袭信息。如需了解详情,请参阅在 Dataplex Universal Catalog 中查看沿袭图。
可用性
数据沿袭支持 BigQuery 和 Cloud Storage 数据源,适用于使用受支持的 Serverless for Apache Spark 运行时版本运行的工作负载和会话,但存在以下例外情况和限制:
- 数据沿袭不适用于 SparkR 或 Spark 流式工作负载或会话。
准备工作
在 Google Cloud 控制台的项目选择器页面上,选择要用于 Serverless for Apache Spark 工作负载或会话的项目。
启用 Data Lineage API。
即将推出的 Spark 数据沿袭变更:请参阅 Serverless for Apache Spark 版本说明,了解相关公告。该变更将自动为您的项目、批量工作负载和交互式会话提供 Spark 数据沿袭功能,前提是您已启用 Data Lineage API(请参阅控制服务的沿袭提取),而无需进行额外的项目、批量工作负载或交互式会话设置。
所需的角色
如果您的批量工作负载使用默认的 Serverless for Apache Spark 服务账号,则该账号拥有 Dataproc Worker 角色,其中包含数据沿袭所需的权限。
不过,如果您的批处理工作负载使用自定义服务账号来启用数据沿袭,则必须向该自定义服务账号授予下段中列出的一个角色,其中包含数据沿袭所需的权限。
如需获得将数据沿袭与 Dataproc 搭配使用所需的权限,请让您的管理员为您授予批量工作负载自定义服务账号的以下 IAM 角色:
-
授予以下角色之一:
-
Dataproc Worker (
roles/dataproc.worker) -
Data Lineage Editor (
roles/datalineage.editor) -
Data Lineage Producer (
roles/datalineage.producer) -
Data Lineage Administrator (
roles/datalineage.admin)
-
Dataproc Worker (
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
启用 Spark 数据沿袭
您可以为项目、批量工作负载或交互式会话启用 Spark 数据沿袭。
在项目级层启用数据沿袭
在项目级层启用 Spark 数据沿袭后,在批量工作负载或交互式会话中运行的后续 Spark 作业将启用 Spark 数据沿袭。
如需在项目级层启用 Spark 数据沿袭,请设置以下自定义项目元数据:
| 键 | 值 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
您可以通过将 DATAPROC_LINEAGE_ENABLED 元数据设置为 false 来为项目停用 Spark 数据沿袭。
在 Spark 批处理工作负载上启用数据沿袭
如需在批处理工作负载上启用数据沿袭,请在提交工作负载时将 spark.dataproc.lineage.enabled 属性设置为 true。此设置会替换项目级的任何 Spark 数据沿袭设置:如果项目级停用了 Spark 数据沿袭,但为批处理工作负载启用了该功能,则批处理工作负载设置优先。
您可以在提交 Spark 批处理工作负载时将 spark.dataproc.lineage.enabled 属性设置为 false,以在该工作负载上停用 Spark 数据沿袭。
此示例使用 gcloud CLI 提交启用了 Spark 沿袭的批处理 lineage-example.py 工作负载。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
以下 lineage-example.py 代码从公共 BigQuery 表中读取数据,然后将输出写入现有 BigQuery 数据集中的新表。它使用 Cloud Storage 存储桶进行临时存储。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
替换以下内容:
- REGION:运行工作负载的区域
- BUCKET:用于存储依赖项的现有 Cloud Storage 存储桶的名称
- PROJECT_ID、DATASET 和 TABLE:项目 ID、现有 BigQuery 数据集的名称,以及要在该数据集中创建的新表的名称(该表不得存在)
您可以在 Dataplex Universal Catalog 界面中查看沿袭图。
在 Spark 交互式会话或会话模板中启用数据沿袭
如需在 Spark 交互式会话或会话模板中启用数据沿袭,请在创建会话或会话模板时将 spark.dataproc.lineage.enabled 属性设置为 true。此设置会替换项目级的任何 Spark 数据沿袭设置:如果项目级停用了 Spark 数据沿袭,但为交互式会话启用了该功能,则交互式会话设置优先。
您可以在创建 Spark 交互式会话或会话模板时,将 spark.dataproc.lineage.enabled 属性设置为 false,以停用 Spark 交互式会话或会话模板中的 Spark 数据沿袭。
以下 PySpark 笔记本代码配置了一个启用了 Spark 数据沿袭的 Serverless for Apache Spark 交互式会话。然后,它会创建一个 Spark Connect 会话,该会话对公共 BigQuery Shakespeare 数据集运行字数统计查询,然后将输出写入现有 BigQuery 数据集中的新表(请参阅在 BigQuery Studio 笔记本中创建 Spark 会话)。
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
替换以下内容:
- PROJECT_ID、DATASET 和 TABLE:项目 ID、现有 BigQuery 数据集的名称,以及要在该数据集中创建的新表的名称(该表不得存在)
如需查看数据沿袭图,请在 BigQuery 探索器页面的导航窗格中点击列出的目标表名称,然后选择表详情窗格中的“沿袭”标签页。
在 Dataplex Universal Catalog 中查看沿袭
沿袭图显示了项目资源与创建这些资源的进程之间的关系。您可以在 Google Cloud 控制台中查看数据沿袭信息,也可以从 Data Lineage API 中以 JSON 数据的形式检索该信息。
后续步骤
- 详细了解数据沿袭。
- 在互动式实验中试用数据沿袭功能:在 Dataplex 中使用数据沿袭和 OpenLineage 捕获和探索数据更新。