Spark データリネージの使用

このドキュメントでは、Managed Service for Apache Spark ジョブのデータリネージプロジェクトまたはクラスタレベルで有効にする方法について説明します。

データリネージKnowledge Catalog の機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用された変換)を追跡できます。

データリネージは、SparkR と Spark ストリーミング ジョブを除くすべての Managed Service for Apache Spark ジョブで使用でき、BigQuery と Cloud Storage のデータソースをサポートしています。これは、Managed Service for Apache Spark 2.0.74+、2.1.22+、2.2.50+、2.3.1+、3.0 のイメージ バージョンに含まれています。

Managed Service for Apache Spark クラスタでこの機能を有効にすると、Managed Service for Apache Spark Spark ジョブはリネージ イベントをキャプチャし、Knowledge Catalog Data Lineage API にパブリッシュします。Managed Service for Apache Spark は、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。

データリネージの情報には、以下を使用して Knowledge Catalog を介してアクセスできます。

始める前に

  1. Google Cloud コンソールの [プロジェクト セレクタ] ページで、リネージを追跡する Managed Service for Apache Spark クラスタを含むプロジェクトを選択します。

    プロジェクト セレクタに移動

  2. データリネージ API を有効にします。

    API を有効にする

    Spark データリネージの今後の変更: Managed Service for Apache Spark のリリースノートで、Data Lineage API を有効にすると(サービスのリネージ取り込みを制御するを参照)、追加のプロジェクト レベルまたはクラスタ レベルの設定を行わなくても、プロジェクトとクラスタで Spark データリネージが自動的に利用可能になる変更について発表されています。

必要なロール

デフォルトの VM サービス アカウントを使用して Managed Service for Apache Spark クラスタを作成すると、データリネージを有効にする Managed Service for Apache Spark Worker ロールが付与されます。追加のアクションは不要です。

ただし、カスタム サービス アカウントを使用する Managed Service for Apache Spark クラスタを作成し、クラスタでデータリネージを有効にする場合は、次の段落で説明するように、カスタム サービス アカウントに必要なロールを付与する必要があります。

Managed Service for Apache Spark でデータリネージを使用するために必要な権限を取得するには、クラスタのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。

Spark データリネージを有効にする

Spark データリネージは、プロジェクト レベルまたはクラスタレベルで有効にできます。

プロジェクト レベルで Spark データリネージを有効にする

プロジェクト レベルで Spark データリネージを有効にすると、プロジェクトの Managed Service for Apache Spark クラスタで実行される後続の Spark ジョブで Spark データリネージが有効になります。

プロジェクト レベルで Spark データリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。

キー
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform
この VM アクセス スコープの設定は、2.0 イメージ バージョン クラスタでのみ必要です。2.1 以降のイメージ バージョン クラスタでは自動的に設定されます。

DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクト レベルで Spark データリネージを無効にできます。

クラスタレベルで Spark データリネージを有効にする

クラスタの作成時に Spark データリネージを有効にすると、Managed Service for Apache Spark クラスタで実行されるサポートされている Spark ジョブで Spark データリネージが有効になります。この設定は、プロジェクト レベルの Spark データリネージ設定をオーバーライドします。プロジェクト レベルで Spark データリネージが無効になっていても、クラスタレベルで有効になっている場合、クラスタレベルが優先され、クラスタで実行されるサポートされている Spark ジョブでデータリネージが有効になります。

クラスタで Spark データリネージを有効にするには、dataproc:dataproc.lineage.enabled クラスタ プロパティを true に設定して Managed Service for Apache Spark クラスタを作成します。

gcloud CLI の例:

gcloud dataproc clusters create CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --properties 'dataproc:dataproc.lineage.enabled=true'

クラスタの作成時に dataproc:dataproc.lineage.enabled プロパティを false に設定すると、クラスタの Spark データリネージを無効にできます。

  • クラスタでデータリネージを無効にする: リネージを無効にしてクラスタを作成するには、dataproc:dataproc.lineage.enabled=false を設定します。クラスタの作成後に、クラスタの Spark データリネージを無効にすることはできません。既存のクラスタで Spark データリネージを無効にするには、dataproc:dataproc.lineage.enabled プロパティを false に設定してクラスタを再作成します。

  • 2.0 イメージ バージョン クラスタのスコープを設定する: Spark データリネージには、Managed Service for Apache Spark クラスタ VM アクセスの cloud-platform スコープが必要です。イメージ バージョン 2.1 以降で作成された Managed Service for Apache Spark イメージ バージョン クラスタでは、cloud-platform が有効になっています。クラスタの作成時に Managed Service for Apache Spark イメージ バージョン 2.0 を指定する場合は、スコープを cloud-platform に設定します。

ジョブで Spark データリネージを無効にする

クラスタで Spark データリネージが有効になっている場合は、ジョブの送信時に空の値("")で spark.extraListeners プロパティを渡すことで、ジョブで Spark データリネージを無効にできます。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.extraListeners=''

Spark ジョブの送信

Spark データリネージを有効にして作成された Managed Service for Apache Spark クラスタで サポートされている Spark ジョブを送信すると、Managed Service for Apache Spark はデータリネージ情報をキャプチャして Data Lineage API に報告します。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --project PROJECT_ID \
    --region REGION \
    --class CLASS \
    --jars=gs://APPLICATION_BUCKET/spark-application.jar \
    --properties=spark.openlineage.namespace=CUSTOM_NAMESPACE,spark.openlineage.appName=CUSTOM_APPNAME

注:

  • ジョブを一意に識別するために使用される spark.openlineage.namespace プロパティと spark.openlineage.appName プロパティの追加は省略可能です。これらのプロパティを追加しない場合、Managed Service for Apache Spark は次のデフォルト値を使用します。
    • spark.openlineage.namespace のデフォルト値: PROJECT_ID
    • spark.openlineage.appName のデフォルト値: spark.app.name

Knowledge Catalog でリネージを表示する

リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。 Google Cloud コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データの形式で取得することもできます。

PySpark のサンプルコード:

次の PySpark ジョブは、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

bucket = 'gs://BUCKET`
spark.conf.set('temporaryCloudStorageBucket', bucket)

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

次のように置き換えます。

  • BUCKET: 既存の Cloud Storage バケットの名前

  • PROJECT_IDDATASETTABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)

リネージグラフは、Knowledge Catalog UI で表示できます。

リネージグラフの例

次のステップ