このドキュメントでは、 データリネージを Managed Service for Apache Spark バッチ ワークロードとインタラクティブ セッションで、 プロジェクト、 バッチ ワークロード、または インタラクティブ セッションのレベルで有効にする方法について説明します。
概要
データリネージは Knowledge Catalog の機能で、システム内でのデータの移動(データの送信元、データの送信先、データに適用された変換)を追跡できます。
Managed Service for Apache Spark ワークロードとセッションは、リネージ イベントをキャプチャして Knowledge Catalog Data Lineage API にパブリッシュします。 Managed Service for Apache Spark は、 OpenLineage を介して Data Lineage API と統合され、 OpenLineage Spark プラグインを使用します。
リネージ情報にアクセスするには、Knowledge Catalog で リネージグラフ と Data Lineage APIを使用します。 詳細については、Knowledge Catalog でリネージグラフを表示するをご覧ください。
対象
BigQuery と Cloud Storage データソースをサポートするデータリネージは、 サポートされている Managed Service for Apache Spark ランタイム バージョンで実行されるワークロードとセッションで使用できますが、 次の例外と制限があります。
- SparkR または Spark ストリーミングのワークロードまたはセッションでは、データリネージを使用できません。
始める前に
コンソールのプロジェクト セレクタのページで、Managed Service for Apache Spark ワークロードまたはセッションに使用するプロジェクトを選択します。 Google Cloud
データリネージ API を有効にします。
今後の Spark データリネージの変更 Managed Service for Apache Spark リリースノート で、データリネージ API を有効にすると( サービスの系統取り込みを制御する を参照)、追加のプロジェクト、バッチ ワークロード、インタラクティブ セッションの設定を行わなくても、プロジェクト、バッチ ワークロード、インタラクティブ セッションで Spark データリネージを自動的に使用できるようになる変更についてお知らせします。
必要なロール
バッチ ワークロードで
デフォルトの Managed Service for Apache Spark サービス アカウントを使用している場合、
Managed Service for Apache Spark Worker
ロールが付与されます。このロールには、データリネージに必要な権限が含まれています。
ただし、バッチ ワークロードでカスタム サービス アカウントを使用してデータリネージを有効にする場合は、次の段落に記載されているロールのいずれか をカスタム サービス アカウントに付与する必要があります。これらのロールには、データリネージに必要な権限が含まれています。
Managed Service for Apache Spark でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。
-
次のいずれかのロールを付与します。
-
Managed Service for Apache Spark Worker (
roles/dataproc.worker) -
データリネージ編集者 (
roles/datalineage.editor) -
データリネージ プロデューサー (
roles/datalineage.producer) -
データリネージ管理者 (
roles/datalineage.admin)
-
Managed Service for Apache Spark Worker (
ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。
必要な権限は、カスタム ロールや他の事前定義 ロールから取得することもできます。
Spark データリネージを有効にする
プロジェクト、バッチ ワークロード、インタラクティブ セッションで Spark データリネージを有効にできます。
プロジェクト レベルでデータリネージを有効にする
プロジェクト レベルで Spark データリネージを有効にすると、バッチ ワークロードまたはインタラクティブ セッションで実行される後続の Spark ジョブで Spark データリネージが有効になります。
プロジェクトで Spark データリネージを有効にするには、 次のカスタム プロジェクト メタデータを設定します。
| キー | 値 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクトの Spark データリネージを無効にできます。
Spark バッチ ワークロードでデータリネージを有効にする
バッチ ワークロードでデータリネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。この設定は、プロジェクト レベルの Spark データリネージ
設定よりも優先されます。プロジェクト レベルで Spark
データリネージが無効になっていても、バッチ ワークロードで有効になっている場合は、バッチ ワークロードの設定
が優先されます。
ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを false に設定すると、Spark バッチ ワークロードの Spark データリネージを無効にできます。
この例では、gcloud CLI を使用して、Spark リネージが有効になっているバッチ lineage-example.py ワークロードを送信します。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
次の lineage-example.py コードは、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
次のように置き換えます。
- REGION: ワークロードを実行するリージョン
- BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、 データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
リネージグラフは、Knowledge Catalog UI で表示できます。
Spark インタラクティブ セッションまたはセッション テンプレートでデータリネージを有効にする
Spark インタラクティブ セッションまたはセッション テンプレートでデータリネージを有効にするには、
spark.dataproc.lineage.enabled プロパティを true に設定します。
セッションまたはセッション テンプレートを作成するときに。この設定は、プロジェクト レベルの Spark データリネージ
設定よりも優先されます。プロジェクト レベルで Spark
データリネージが無効になっていても、インタラクティブ セッションで有効になっている場合は、インタラクティブ セッションの設定が優先されます。
インタラクティブ セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを false に設定すると、Spark インタラクティブ セッションまたはセッション テンプレートの Spark データリネージを無効にできます。
次の PySpark ノートブック コードは、Spark データリネージが有効になっている Managed Service for Apache Spark インタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットで単語数クエリを実行する Spark Connect セッションを作成し、出力を 既存の BigQuery データセットの新しいテーブルに書き込みます( BigQuery Studio ノートブックで Spark セッションを作成するを参照)。
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
次のように置き換えます。
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、 データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
データリネージグラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。
Knowledge Catalog でリネージを表示する
リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データの形式で取得することもできます。 Google Cloud
次のステップ
- データリネージの詳細を学習する。
- インタラクティブ ラボでデータリネージを試す: Dataplex でデータリネージと OpenLineage を使用してデータ更新をキャプチャして探索する。