在 Dataproc 中使用 Spark 資料歷程

本文說明如何在專案叢集層級,為 Dataproc Spark 工作啟用資料歷程

資料歷程Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。

資料沿襲適用於所有 Dataproc Spark 工作 (SparkR 和 Spark 串流工作除外),並支援 BigQuery 和 Cloud Storage 資料來源。Dataproc on Compute Engine 2.0.74+、2.1.22+、2.2.50+、2.3.1+ 和 3.0 映像檔版本均內含此功能。

在 Dataproc 叢集中啟用這項功能後,Dataproc Spark 工作會擷取資料歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Dataproc 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式

您可以透過 Dataplex Universal Catalog,使用下列方式存取資料歷程資訊:

事前準備

  1. 在 Google Cloud 控制台的專案選取器頁面中,選取包含要追蹤歷程之 Dataproc 叢集的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

    即將推出的 Spark 資料歷程變更:請參閱 Dataproc 版本資訊,瞭解相關公告。啟用 Data Lineage API (請參閱「控管服務的歷程資料擷取作業」) 後,系統會自動為專案和叢集提供 Spark 資料歷程,不需額外設定專案或叢集層級。

必要的角色

如果您使用預設 VM 服務帳戶建立 Dataproc 叢集,該帳戶會具備 Dataproc Worker 角色,可啟用資料歷程功能。你不需要採取其他動作。

不過,如果您建立的 Dataproc 叢集使用自訂服務帳戶,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料歷程功能,詳情請參閱下一個段落。

如要取得在 Dataproc 中使用資料歷程功能所需的權限,請要求管理員在叢集的自訂服務帳戶中,授予下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

啟用 Spark 資料歷程

您可以在專案或叢集層級啟用 Spark 資料歷程。

在專案層級啟用 Spark 資料歷程

在專案層級啟用 Spark 資料歷程後,專案中 Dataproc 叢集上執行的後續 Spark 工作都會啟用 Spark 資料歷程。

如要在專案層級啟用 Spark 資料歷程,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
只有 2.0 映像檔版本叢集才需要設定這個 VM 存取權範圍。系統會自動為 2.1 以上版本映像檔叢集設定這項屬性。

您可以將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false,在專案層級停用 Spark 資料沿襲。

在叢集層級啟用 Spark 資料歷程

如果您在建立叢集時啟用 Spark 資料歷程,在 Dataproc 叢集上執行的支援 Spark 工作就會啟用 Spark 資料歷程。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級已停用 Spark 資料歷程,但叢集層級已啟用,則以叢集層級為優先,且在叢集上執行的支援 Spark 工作會啟用資料歷程。

如要在叢集上啟用 Spark 資料歷程,請建立 Dataproc 叢集,並將 dataproc:dataproc.lineage.enabled 叢集屬性設為 true

gcloud CLI 範例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

如要在叢集上停用 Spark 資料歷程,請在建立叢集時,將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在叢集上停用資料歷程:如要建立停用歷程的叢集,請設定 dataproc:dataproc.lineage.enabled=false。叢集建立後,您就無法在叢集上停用 Spark 資料沿襲功能。如要在現有叢集上停用 Spark 資料歷程,可以重新建立叢集,並將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在 2.0 映像檔版本叢集上設定範圍:Spark 資料歷程需要 Dataproc 叢集 VM 存取權 cloud-platform 範圍。使用映像檔版本 2.1 以上版本建立的 Dataproc 映像檔版本叢集,會啟用 cloud-platform。如果您在建立叢集時指定 Dataproc 映像檔版本 2.0,請將範圍設為 cloud-platform

在工作中停用 Spark 資料歷程

如果叢集已啟用 Spark 資料歷程,您可以在提交工作時傳遞 spark.extraListeners 屬性,並將值設為空白 (""),藉此停用工作中的 Spark 資料歷程。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

提交 Spark 工作

在已啟用 Spark 資料歷程的 Dataproc 叢集上提交支援的 Spark 工作時,Dataproc 會擷取資料歷程資訊,並向 Data Lineage API 回報。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 新增 spark.openlineage.namespacespark.openlineage.appName 屬性 (用於識別工作) 為選用步驟。如未新增這些屬性,Dataproc 會使用下列預設值:
    • spark.openlineage.namespace 的預設值:PROJECT_ID
    • spark.openlineage.appName 的預設值:spark.app.name

在 Dataplex Universal Catalog 中查看歷程

歷程圖會顯示專案資源與建立這些資源的程序之間的關係。您可以在 Google Cloud 控制台中查看資料歷程資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料歷程資訊。

PySpark 程式碼範例:

下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集中的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

請替換下列項目:

  • BUCKET:現有 Cloud Storage bucket 的名稱

  • PROJECT_IDDATASETTABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)

您可以在 Dataplex Universal Catalog UI 中查看歷程圖。

歷程圖範例

後續步驟