搭配使用資料沿襲和 Serverless for Apache Spark

本文說明如何在專案批次工作負載互動式工作階段層級,為Google Cloud Serverless for Apache Spark 批次工作負載和互動式工作階段啟用資料沿襲

總覽

資料歷程是 Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。

Google Cloud Serverless for Apache Spark 工作負載和工作階段會擷取歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Serverless for Apache Spark 會透過 OpenLineage 整合 Data Lineage API,並使用 OpenLineage Spark 外掛程式

您可以透過 Dataplex Universal Catalog 存取歷程資訊,使用歷程圖Data Lineage API。 詳情請參閱「在 Dataplex Universal Catalog 中查看歷程圖」。

可用性

資料沿襲支援 BigQuery 和 Cloud Storage 資料來源,適用於以支援的 Serverless for Apache Spark 執行階段版本執行的工作負載和工作階段,但有下列例外狀況和限制:

  • SparkR、Spark 串流工作負載或工作階段不支援資料歷程。

事前準備

  1. 在 Google Cloud 控制台的專案選擇器頁面中,選取要用於 Serverless for Apache Spark 工作負載或工作階段的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

    即將推出的 Spark 資料沿襲異動:請參閱 Serverless for Apache Spark 版本資訊,瞭解即將推出的異動。這項異動會自動為專案、批次工作負載和互動式工作階段提供 Spark 資料沿襲,您不必進行額外的專案、批次工作負載或互動式工作階段設定,只要啟用 Data Lineage API 即可 (請參閱「控管服務的沿襲擷取作業」)。

必要的角色

如果批次工作負載使用預設的 Serverless for Apache Spark 服務帳戶,則該帳戶具有 Dataproc Worker 角色,其中包含資料沿襲所需的權限。

不過,如果批次工作負載使用自訂服務帳戶啟用資料沿襲,您必須將下列段落列出的其中一個角色授予自訂服務帳戶,這些角色包含資料沿襲所需的權限。

如要取得在 Dataproc 中使用資料歷程功能所需的權限,請要求管理員在批次工作負載自訂服務帳戶中,授予下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

啟用 Spark 資料歷程

您可以為專案、批次工作負載或互動式工作階段啟用 Spark 資料歷程。

在專案層級啟用資料歷程

在專案層級啟用 Spark 資料歷程後,後續在批次工作負載或互動式工作階段中執行的 Spark 工作,都會啟用 Spark 資料歷程。

如要在專案中啟用 Spark 資料歷程,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true

如要為專案停用 Spark 資料歷程,請將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false

為 Spark 批次工作負載啟用資料歷程

如要在批次工作負載上啟用資料歷程,請在提交工作負載時,將 spark.dataproc.lineage.enabled 屬性設為 true。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級停用 Spark 資料歷程,但批次工作負載啟用這項功能,則批次工作負載設定的優先順序較高。

這個範例使用 gcloud CLI 提交批次工作負載,並啟用 Spark 沿襲。lineage-example.py

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

下列 lineage-example.py 程式碼會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集中的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

更改下列內容:

  • REGION:執行工作負載的區域
  • BUCKET:現有 Cloud Storage bucket 的名稱,用於儲存依附元件
  • PROJECT_IDDATASETTABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)

您可以在 Dataplex Universal Catalog UI 中查看歷程圖。

Spark 歷程圖

為 Spark 互動工作階段啟用資料歷程

如要在 Spark 互動式工作階段中啟用資料歷程,請在建立工作階段或工作階段範本時,將 spark.dataproc.lineage.enabled 屬性設為 true。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級停用了 Spark 資料歷程,但互動工作階段啟用了這項功能,系統會優先採用互動工作階段設定。

下列 PySpark 筆記本程式碼會設定 Serverless for Apache Spark 互動式工作階段,並啟用 Spark 資料沿襲。接著,系統會建立 Spark Connect 工作階段,在公開的 BigQuery Shakespeare 資料集上執行字詞計數查詢,然後將輸出內容寫入現有 BigQuery 資料集的新資料表 (請參閱「在 BigQuery Studio 記事本中建立 Spark 工作階段」)。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

更改下列內容:

  • PROJECT_IDDATASETTABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)

如要查看資料歷程圖,請在 BigQuery「Explorer」頁面的導覽窗格中,點選目的地資料表名稱,然後選取資料表詳細資料窗格中的「歷程」分頁。

Spark 歷程圖

在 Dataplex Universal Catalog 中查看歷程

歷程圖會顯示專案資源與建立這些資源的程序之間的關係。您可以在 Google Cloud 控制台中查看資料歷程資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料歷程資訊。

後續步驟