使用 Spark 資料歷程

本文說明如何在專案叢集層級,為 Managed Service for Apache Spark 工作啟用資料歷程

資料歷程Knowledge Catalog 的功能,可追蹤資料在系統中的移動情形,包括來源、傳遞目的地和採用的轉換作業。

資料沿襲適用於所有 Managed Service for Apache Spark 工作 (SparkR 和 Spark 串流工作除外),並支援 BigQuery 和 Cloud Storage 資料來源。這項功能隨附於 Managed Service for Apache Spark 2.0.74+、2.1.22+、2.2.50+、2.3.1+ 和 3.0 映像檔版本。

在 Managed Service for Apache Spark 叢集中啟用這項功能後,Managed Service for Apache Spark Spark 工作就會擷取資料沿襲事件,並發布至 Knowledge Catalog Data Lineage API。Managed Service for Apache Spark 會透過 OpenLineage 與 Data Lineage API 整合,並使用 OpenLineage Spark 外掛程式

您可以透過 Knowledge Catalog 存取資料歷程資訊,方法如下:

事前準備

  1. 在 Google Cloud 控制台的專案選取器頁面中,選取包含要追蹤歷程之 Managed Service for Apache Spark 叢集的專案。

    前往專案選取器

  2. 啟用 Data Lineage API。

    啟用 API

    即將推出的 Spark 資料沿襲變更:請參閱 Managed Service for Apache Spark 版本資訊,瞭解即將推出的變更。這項變更會自動為專案和叢集提供 Spark 資料沿襲,您只要啟用 Data Lineage API (請參閱「控管服務的沿襲擷取作業」),不必進行額外的專案或叢集層級設定。

必要的角色

如果您使用預設 VM 服務帳戶建立 Managed Service for Apache Spark 叢集,該帳戶會具備 Managed Service for Apache Spark Worker 角色,因此可啟用資料歷程功能。你不需要採取其他動作。

不過,如果您建立的 Managed Service for Apache Spark 叢集使用自訂服務帳戶,則必須將必要角色授予自訂服務帳戶,才能在叢集上啟用資料沿襲功能,詳情請參閱下一個段落。

如要取得使用 Managed Service for Apache Spark 資料歷程功能所需的權限,請要求管理員在叢集的自訂服務帳戶中,授予下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和組織的存取權」。

您或許也能透過自訂角色或其他預先定義的角色,取得必要權限。

啟用 Spark 資料歷程

您可以在專案或叢集層級啟用 Spark 資料歷程。

在專案層級啟用 Spark 資料歷程

在專案層級啟用 Spark 資料歷程後,專案中 Managed Service for Apache Spark 叢集上執行的後續 Spark 工作,都會啟用 Spark 資料歷程。

如要在專案層級啟用 Spark 資料歷程,請設定下列自訂專案中繼資料

DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
只有 2.0 映像檔版本叢集需要設定這個 VM 存取範圍2.1 以上版本的映像檔叢集會自動設定這個範圍。

您可以將 DATAPROC_LINEAGE_ENABLED 中繼資料設為 false,在專案層級停用 Spark 資料沿襲。

在叢集層級啟用 Spark 資料歷程

如果您在建立叢集時啟用 Spark 資料歷程,在 Managed Service for Apache Spark 叢集上執行的支援 Spark 工作,就會啟用 Spark 資料歷程。這項設定會覆寫專案層級的任何 Spark 資料歷程設定:如果專案層級的 Spark 資料歷程已停用,但叢集層級已啟用,則以叢集層級為優先,在叢集上執行的支援 Spark 工作會啟用資料歷程。

如要在叢集上啟用 Spark 資料歷程,請建立 Managed Service for Apache Spark 叢集,並將 dataproc:dataproc.lineage.enabled 叢集屬性設為 true

gcloud CLI 範例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

如要在叢集上停用 Spark 資料歷程,請在建立叢集時,將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在叢集上停用資料歷程:如要建立已停用歷程的叢集,請設定 dataproc:dataproc.lineage.enabled=false。叢集建立後,您就無法在叢集上停用 Spark 資料歷程。如要在現有叢集上停用 Spark 資料歷程,可以重新建立叢集,並將 dataproc:dataproc.lineage.enabled 屬性設為 false

  • 在 2.0 版映像檔叢集上設定範圍:Spark 資料沿襲需要 Managed Service for Apache Spark 叢集 VM 存取權 cloud-platform 範圍。使用 2.1 以上版本映像檔建立的 Managed Service for Apache Spark 映像檔版本叢集已啟用 cloud-platform。如果您在建立叢集時指定 Managed Service for Apache Spark 映像檔版本 2.0,請將範圍設為 cloud-platform

在工作中停用 Spark 資料歷程

如果叢集已啟用 Spark 資料歷程,您可以在提交工作時傳遞 spark.extraListeners 屬性,並將值設為空白 (""),藉此停用工作中的 Spark 資料歷程。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

提交 Spark 工作

在啟用 Spark 資料歷程建立的 Managed Service for Apache Spark 叢集上提交支援的 Spark 工作時,Managed Service for Apache Spark 會擷取資料歷程資訊,並向 Data Lineage API 回報。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注意:

  • 新增 spark.openlineage.namespacespark.openlineage.appName 屬性 (用於識別工作) 為選用步驟。如果您未新增這些屬性,Managed Service for Apache Spark 會使用下列預設值:
    • spark.openlineage.namespace 的預設值:PROJECT_ID
    • spark.openlineage.appName 的預設值:spark.app.name

在 Knowledge Catalog 中查看歷程

歷程圖會顯示專案資源與建立這些資源的程序之間的關係。您可以在 Google Cloud 控制台中查看資料歷程資訊,也可以從 Data Lineage API 擷取 JSON 格式的資料歷程資訊。

PySpark 程式碼範例:

下列 PySpark 工作會從公開 BigQuery 資料表讀取資料,然後將輸出內容寫入現有 BigQuery 資料集的新資料表。這項工作會使用 Cloud Storage bucket 儲存暫時資料。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

請將下列項目改為對應的值:

  • BUCKET:現有 Cloud Storage bucket 的名稱

  • PROJECT_IDDATASETTABLE:專案 ID、現有 BigQuery 資料集的名稱,以及要在資料集中建立的新資料表名稱 (資料表不得存在)

您可以在 Knowledge Catalog UI 中查看歷程圖。

歷程圖範例

後續步驟