このドキュメントでは、Managed Service for Apache Spark のバッチ ワークロードとインタラクティブ セッションで、プロジェクト、バッチ ワークロード、インタラクティブ セッションのいずれかのレベルでデータリネージを有効にする方法について説明します。
概要
データリネージは Knowledge Catalog の機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用された変換)を追跡できます。
Managed Service for Apache Spark のワークロードとセッションは、リネージ イベントをキャプチャして、Knowledge Catalog の Data Lineage API にパブリッシュします。Managed Service for Apache Spark は、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。
Knowledge Catalog でリネージ情報にアクセスするには、リネージグラフと Data Lineage API を使用します。詳細については、Knowledge Catalog でリネージグラフを表示するをご覧ください。
対象
BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、サポートされている Managed Service for Apache Spark ランタイム バージョンで実行されているワークロードとセッションで使用できますが、次の例外と制限があります。
- データリネージは、SparkR または Spark ストリーミング ワークロードやセッションでは使用できません。
始める前に
Google Cloud コンソールのプロジェクト選択ツールで、Managed Service for Apache Spark のワークロードまたはセッションに使用するプロジェクトを選択します。
データリネージ API を有効にします。
Spark データ リネージの今後の変更: Data Lineage API を有効にすると(サービスのリネージ取り込みを制御するをご覧ください)、追加のプロジェクト、バッチ ワークロード、インタラクティブ セッションの設定を必要とせずに、Spark データ リネージをプロジェクト、バッチ ワークロード、インタラクティブ セッションで自動的に使用できるようになる変更については、Managed Service for Apache Spark のリリースノートをご覧ください。
必要なロール
バッチ ワークロードでデフォルトの Managed Service for Apache Spark サービス アカウントを使用している場合、データ リネージに必要な権限を含む Managed Service for Apache Spark Worker ロールが付与されています。
ただし、バッチ ワークロードでカスタム サービス アカウントを使用してデータリネージを有効にする場合は、データリネージに必要な権限を含む、次の段落に記載されているロールのいずれかをカスタム サービス アカウントに付与する必要があります。
Managed Service for Apache Spark でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。
-
次のいずれかのロールを付与します。
-
Managed Service for Apache Spark Worker (
roles/dataproc.worker) -
データリネージ編集者 (
roles/datalineage.editor) -
データリネージ プロデューサー (
roles/datalineage.producer) -
データリネージ管理者 (
roles/datalineage.admin)
-
Managed Service for Apache Spark Worker (
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Spark データリネージを有効にする
Spark データリネージは、プロジェクト、バッチ ワークロード、インタラクティブ セッションで有効にできます。
プロジェクト レベルでデータリネージを有効にする
プロジェクト レベルで Spark データリネージを有効にすると、バッチ ワークロードまたはインタラクティブ セッションで実行される後続の Spark ジョブで Spark データリネージが有効になります。
プロジェクトで Spark データリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。
| キー | 値 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクトの Spark データリネージを無効にできます。
Spark バッチ ワークロードでデータリネージを有効にする
バッチ ワークロードでデータリネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。この設定は、プロジェクト レベルの Spark データリネージの設定をオーバーライドします。プロジェクト レベルで Spark データリネージが無効になっていても、バッチ ワークロードで有効になっている場合は、バッチ ワークロードの設定が優先されます。
Spark バッチ ワークロードの Spark データリネージを無効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを false に設定します。
この例では、gcloud CLI を使用して、Spark リネージが有効になっているバッチ lineage-example.py ワークロードを送信します。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
次の lineage-example.py コードは、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
次のように置き換えます。
- REGION: ワークロードを実行するリージョン
- BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
リネージグラフは、Knowledge Catalog UI で表示できます。
Spark インタラクティブ セッションまたはセッション テンプレートでデータリネージを有効にする
Spark インタラクティブ セッションまたはセッション テンプレートでデータリネージを有効にするには、セッションまたはセッション テンプレートを作成するときに、spark.dataproc.lineage.enabled プロパティを true に設定します。この設定は、プロジェクト レベルの Spark データリネージ設定をオーバーライドします。プロジェクト レベルで Spark データリネージが無効になっていても、インタラクティブ セッションで有効になっている場合は、インタラクティブ セッションの設定が優先されます。
Spark インタラクティブ セッションまたはセッション テンプレートで Spark データリネージを無効にするには、インタラクティブ セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを false に設定します。
次の PySpark ノートブック コードは、Spark データ リネージが有効になっている Managed Service for Apache Spark インタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行し、既存の BigQuery データセットの新しいテーブルに出力を書き込む Spark Connect セッションを作成します(BigQuery Studio ノートブックで Spark セッションを作成するをご覧ください)。
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
次のように置き換えます。
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
データ リネージ グラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。
Knowledge Catalog でリネージを表示する
リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。 Google Cloud コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データとして取得することもできます。
次のステップ
- データリネージの詳細を学習する。
- インタラクティブ ラボでデータリネージを試す: Dataplex でデータリネージと OpenLineage を使用してデータ更新をキャプチャして探索する。