使用 Spark 資料歷程

本文說明如何在專案叢集層級,為 Managed Service for Apache Spark Spark 工作啟用資料歷程

資料歷程Dataplex Universal Catalog 的功能,可追蹤資料在系統中的移動方式,包括資料來源、傳遞至何處,以及套用的轉換。

除了 SparkR 和 Spark 串流工作外,所有 Managed Service for Apache Spark Spark 工作都提供資料沿襲功能,並支援 BigQuery 和 Cloud Storage 資料來源。這項功能已納入 Managed Service for Apache Spark 2.0.74+、2.1.22+、2.2.50+、2.3.1+ 和 3.0 版映像檔。

在 Managed Service for Apache Spark 叢集中啟用這項功能後,Managed Service for Apache Spark Spark 工作會擷取資料歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Managed Service for Apache Spark 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式

您可以透過 Dataplex Universal Catalog,使用下列方式存取資料歷程資訊:

事前準備

  1. 在 Google Cloud 控制台的專案選取器頁面中,選取包含要追蹤歷程之 Managed Service for Apache Spark 叢集的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

    即將推出的 Spark 資料沿襲變更:請參閱 Managed Service for Apache Spark 版本資訊,瞭解即將推出的變更。這項變更會自動為專案和叢集提供 Spark 資料沿襲功能,您只需啟用 Data Lineage API (請參閱「控管服務的沿襲擷取作業」),不必進行額外的專案或叢集層級設定。

必要的角色

如果您使用預設 VM 服務帳戶建立 Managed Service for Apache Spark 叢集,該叢集會具備 Managed Service for Apache Spark Worker 角色,因此可啟用資料歷程功能。你不需要採取其他動作。

不過,如果您建立使用自訂服務帳戶的 Apache Spark 代管服務叢集,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料沿襲功能,詳情請參閱下段說明。

如要取得使用 Managed Service for Apache Spark 資料歷程功能所需的權限,請要求系統管理員在叢集的自訂服務帳戶中,授予下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

啟用 Spark 資料歷程

您可以在專案或叢集層級啟用 Spark 資料歷程。

在專案層級啟用 Spark 資料歷程

在專案層級啟用 Spark 資料歷程後,專案中 Managed Service for Apache Spark 叢集上執行的後續 Spark 工作,都會啟用 Spark 資料歷程。

如要在專案層級啟用 Spark 資料歷程,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
只有 2.0 映像檔版本叢集才需要設定這個 VM 存取權範圍。在 2.1 以上版本映像檔叢集,這項設定會自動啟用。

您可以將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false,在專案層級停用 Spark 資料沿襲。

在叢集層級啟用 Spark 資料歷程

如果您在建立叢集時啟用 Spark 資料歷程,在 Managed Service for Apache Spark 叢集上執行的支援 Spark 工作,就會啟用 Spark 資料歷程。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級已停用 Spark 資料歷程,但叢集層級已啟用,則以叢集層級為優先,且在叢集上執行的支援 Spark 工作會啟用資料歷程。

如要在叢集上啟用 Spark 資料歷程,請建立 Managed Service for Apache Spark 叢集,並將 dataproc:dataproc.lineage.enabled 叢集屬性設為 true

gcloud CLI 範例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

如要在叢集上停用 Spark 資料歷程,請在建立叢集時,將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在叢集上停用資料歷程:如要建立停用歷程的叢集,請設定 dataproc:dataproc.lineage.enabled=false。叢集建立後,您就無法在叢集上停用 Spark 資料沿襲。如要在現有叢集上停用 Spark 資料歷程,可以重新建立叢集,並將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在 2.0 映像檔版本叢集上設定範圍:Managed Service for Apache Spark 叢集 VM 存取權 cloud-platform 範圍 是 Spark 資料歷程的必要條件。如果使用 2.1 以上版本的映像檔建立 Managed Service for Apache Spark 映像檔版本叢集,系統會啟用 cloud-platform。如果您在建立叢集時指定 Managed Service for Apache Spark 映像檔版本 2.0,請將範圍設為 cloud-platform

在工作中停用 Spark 資料歷程

如果叢集已啟用 Spark 資料歷程,您可以在提交工作時傳遞 spark.extraListeners 屬性,並將值設為空白 (""),藉此停用工作中的 Spark 資料歷程。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

提交 Spark 工作

在已啟用 Spark 資料歷程的 Managed Service for Apache Spark 叢集上提交支援的 Spark 工作時,Managed Service for Apache Spark 會擷取資料歷程資訊,並向 Data Lineage API 回報。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 新增 spark.openlineage.namespacespark.openlineage.appName 屬性 (用於識別工作) 為選用步驟。如果您未新增這些屬性,Managed Service for Apache Spark 會使用下列預設值:
    • spark.openlineage.namespace 的預設值:PROJECT_ID
    • spark.openlineage.appName 的預設值:spark.app.name

在 Dataplex Universal Catalog 中查看歷程

歷程圖會顯示專案資源與建立這些資源的程序之間的關係。您可以在 Google Cloud 控制台中查看資料歷程資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料歷程資訊。

PySpark 程式碼範例:

下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集中的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

請替換下列項目:

  • BUCKET:現有 Cloud Storage bucket 的名稱

  • PROJECT_IDDATASETTABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)

您可以在 Dataplex Universal Catalog UI 中查看歷程圖。

歷程圖範例

後續步驟