本文說明如何在專案或叢集層級,為 Dataproc Spark 工作啟用資料歷程。
資料歷程是 Dataplex Universal Catalog 的功能,可讓您追蹤資料在系統之間的移動情形,包括來源、傳遞目的地和採用的轉換機制。
資料沿襲適用於所有 Dataproc Spark 工作 (SparkR 和 Spark 串流工作除外),並支援 BigQuery 和 Cloud Storage 資料來源。Dataproc on Compute Engine 2.0.74+、2.1.22+、2.2.50+、2.3.1+ 和 3.0 映像檔版本均內含此功能。
在 Dataproc 叢集中啟用這項功能後,Dataproc Spark 工作會擷取資料歷程事件,並發布至 Dataplex Universal Catalog Data Lineage API。Dataproc 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式。
您可以透過 Dataplex Universal Catalog,使用下列方式存取資料歷程資訊:
事前準備
在 Google Cloud 控制台的專案選取器頁面中,選取包含要追蹤歷程之 Dataproc 叢集的專案。
啟用 Data Lineage API。
即將推出的 Spark 資料歷程變更:請參閱 Dataproc 版本資訊,瞭解相關公告。啟用 Data Lineage API (請參閱「控管服務的歷程資料擷取作業」) 後,系統會自動為專案和叢集提供 Spark 資料歷程,不需額外設定專案或叢集層級。
必要的角色
如果您使用預設 VM 服務帳戶建立 Dataproc 叢集,該帳戶會具備 Dataproc Worker 角色,可啟用資料歷程功能。你不需要採取其他動作。
不過,如果您建立的 Dataproc 叢集使用自訂服務帳戶,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料歷程功能,詳情請參閱下一個段落。
如要取得在 Dataproc 中使用資料歷程功能所需的權限,請要求管理員在叢集的自訂服務帳戶中,授予下列 IAM 角色:
-
授予下列其中一個角色:
-
Dataproc Worker (
roles/dataproc.worker) -
資料歷程編輯者 (
roles/datalineage.editor) -
資料歷程產生者 (
roles/datalineage.producer) -
資料歷程管理員 (
roles/datalineage.admin)
-
Dataproc Worker (
如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。
啟用 Spark 資料歷程
您可以在專案或叢集層級啟用 Spark 資料歷程。
在專案層級啟用 Spark 資料歷程
在專案層級啟用 Spark 資料歷程後,專案中 Dataproc 叢集上執行的後續 Spark 工作都會啟用 Spark 資料歷程。
如要在專案層級啟用 Spark 資料歷程,請設定下列自訂專案中繼資料:
| 鍵 | 值 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform只有 2.0 映像檔版本叢集才需要設定這個 VM 存取權範圍。系統會自動為 2.1 以上版本映像檔叢集設定這項屬性。 |
您可以將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false,在專案層級停用 Spark 資料沿襲。
在叢集層級啟用 Spark 資料歷程
如果您在建立叢集時啟用 Spark 資料歷程,在 Dataproc 叢集上執行的支援 Spark 工作就會啟用 Spark 資料歷程。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級已停用 Spark 資料歷程,但叢集層級已啟用,則以叢集層級為優先,且在叢集上執行的支援 Spark 工作會啟用資料歷程。
如要在叢集上啟用 Spark 資料歷程,請建立 Dataproc 叢集,並將 dataproc:dataproc.lineage.enabled 叢集屬性設為 true。
gcloud CLI 範例:
gcloud dataproc clusters create CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--properties 'dataproc:dataproc.lineage.enabled=true'如要在叢集上停用 Spark 資料歷程,請在建立叢集時,將 dataproc:dataproc.lineage.enabled 屬性設為 false。
在叢集上停用資料歷程:如要建立停用歷程的叢集,請設定
dataproc:dataproc.lineage.enabled=false。叢集建立後,您就無法在叢集上停用 Spark 資料沿襲功能。如要在現有叢集上停用 Spark 資料歷程,可以重新建立叢集,並將dataproc:dataproc.lineage.enabled屬性設為false。在 2.0 映像檔版本叢集上設定範圍:Spark 資料歷程需要 Dataproc 叢集 VM 存取權
cloud-platform範圍。使用映像檔版本2.1以上版本建立的 Dataproc 映像檔版本叢集,會啟用cloud-platform。如果您在建立叢集時指定 Dataproc 映像檔版本2.0,請將範圍設為cloud-platform。
在工作中停用 Spark 資料歷程
如果叢集已啟用 Spark 資料歷程,您可以在提交工作時傳遞 spark.extraListeners 屬性,並將值設為空白 (""),藉此停用工作中的 Spark 資料歷程。
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.extraListeners=''提交 Spark 工作
在已啟用 Spark 資料歷程的 Dataproc 叢集上提交支援的 Spark 工作時,Dataproc 會擷取資料歷程資訊,並向 Data Lineage API 回報。
gcloud dataproc jobs submit spark \
--cluster=CLUSTER_NAME \
--project PROJECT_ID \
--region REGION \
--class CLASS \
--jars=gs://APPLICATION_BUCKET/spark-application.jar \
--properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME注意:
- 新增
spark.openlineage.namespace和spark.openlineage.appName屬性 (用於識別工作) 為選用步驟。如未新增這些屬性,Dataproc 會使用下列預設值:spark.openlineage.namespace的預設值:PROJECT_IDspark.openlineage.appName的預設值:spark.app.name
在 Dataplex Universal Catalog 中查看歷程
歷程圖會顯示專案資源與建立這些資源的程序之間的關係。您可以在 Google Cloud 控制台中查看資料歷程資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料歷程資訊。
PySpark 程式碼範例:
下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集中的新資料表。並使用 Cloud Storage bucket 做為暫時儲存空間。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
請替換下列項目:
BUCKET:現有 Cloud Storage bucket 的名稱
PROJECT_ID、DATASET 和 TABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)
您可以在 Dataplex Universal Catalog UI 中查看歷程圖。
後續步驟
- 進一步瞭解資料歷程。