このドキュメントでは、Google Cloud Serverless for Apache Spark バッチ ワークロードとインタラクティブ セッションで、プロジェクト、バッチ ワークロード、またはインタラクティブ セッション レベルでデータリネージを有効にする方法について説明します。
概要
データリネージは Dataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。
Google Cloud Apache Spark 向けサーバーレスのワークロードとセッションは、リネージ イベントをキャプチャして、Dataplex Universal Catalog Data Lineage API にパブリッシュします。Apache Spark 向け Serverless は、OpenLineage Spark プラグインを使用して OpenLineage を介して Data Lineage API と統合されます。
Dataplex Universal Catalog でリネージ情報にアクセスするには、リネージグラフと Data Lineage API を使用します。詳細については、Dataplex Universal Catalog でリネージグラフを表示するをご覧ください。
対象
BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、サポートされている Apache Spark 向け Serverless ランタイム バージョンで実行されるワークロードとセッションで使用できますが、次の例外と制限があります。
- データリネージは、SparkR または Spark ストリーミング ワークロードやセッションでは使用できません。
始める前に
Google Cloud コンソールのプロジェクト セレクタのページで、Apache Spark 向け Serverless のワークロードまたはセッションに使用するプロジェクトを選択します。
データリネージ API を有効にします。
Spark データ リネージの今後の変更: Data Lineage API を有効にすると(サービスのリネージ取り込みを制御するをご覧ください)、追加のプロジェクト、バッチ ワークロード、インタラクティブ セッションの設定を必要とせずに、Spark データ リネージをプロジェクト、バッチ ワークロード、インタラクティブ セッションで自動的に使用できるようになる変更については、Apache Spark 向け Serverless のリリースノートをご覧ください。
必要なロール
バッチ ワークロードがデフォルトの Apache Spark 用サーバーレス サービス アカウントを使用している場合、データリネージに必要な権限を含む Dataproc Worker ロールが付与されています。
ただし、バッチ ワークロードでカスタム サービス アカウントを使用してデータリネージを有効にする場合は、データリネージに必要な権限を含む、次の段落に記載されているロールのいずれかをカスタム サービス アカウントに付与する必要があります。
Dataproc でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。
-
次のいずれかのロールを付与します。
-
Dataproc ワーカー(
roles/dataproc.worker) -
データリネージ編集者(
roles/datalineage.editor) -
データリネージ プロデューサー(
roles/datalineage.producer) -
データリネージ管理者(
roles/datalineage.admin)
-
Dataproc ワーカー(
ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
必要な権限は、カスタムロールや他の事前定義ロールから取得することもできます。
Spark データリネージを有効にする
Spark データリネージは、プロジェクト、バッチ ワークロード、インタラクティブ セッションで有効にできます。
プロジェクト レベルでデータリネージを有効にする
プロジェクト レベルで Spark データリネージを有効にすると、バッチ ワークロードまたはインタラクティブ セッションで実行される後続の Spark ジョブで Spark データリネージが有効になります。
プロジェクトで Spark データリネージを有効にするには、次のカスタム プロジェクト メタデータを設定します。
| キー | 値 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクトの Spark データリネージを無効にできます。
Spark バッチ ワークロードのデータリネージを有効にする
バッチ ワークロードでデータリネージを有効にするには、ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定します。この設定は、プロジェクト レベルの Spark データリネージ設定をオーバーライドします。プロジェクト レベルで Spark データリネージが無効になっていても、バッチ ワークロードで有効になっている場合は、バッチ ワークロードの設定が優先されます。
この例では、gcloud CLI を使用して、Spark リネージが有効になっているバッチ lineage-example.py ワークロードを送信します。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
次の lineage-example.py コードは、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
次のように置き換えます。
- REGION: ワークロードを実行するリージョン
- BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
リネージグラフは、Dataplex Universal Catalog UI で表示できます。
Spark インタラクティブ セッションのデータリネージを有効にする
Spark インタラクティブ セッションでデータリネージを有効にするには、セッションまたはセッション テンプレートを作成するときに、spark.dataproc.lineage.enabled プロパティを true に設定します。この設定は、プロジェクト レベルの Spark データリネージ設定をオーバーライドします。プロジェクト レベルで Spark データリネージが無効になっていても、インタラクティブ セッションで有効になっている場合は、インタラクティブ セッションの設定が優先されます。
次の PySpark ノートブック コードは、Spark データ リネージが有効になっている Apache Spark 向け Serverless のインタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行し、既存の BigQuery データセットの新しいテーブルに出力を書き込む Spark Connect セッションを作成します(BigQuery Studio ノートブックで Spark セッションを作成するをご覧ください)。
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
次のように置き換えます。
- PROJECT_ID、DATASET、TABLE: プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)
データ リネージ グラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。
Dataplex Universal Catalog でリネージを表示する
リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。 Google Cloud コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データとして取得することもできます。
次のステップ
- データリネージの詳細を学習する。
- インタラクティブ ラボでデータリネージを試す: Dataplex でデータリネージと OpenLineage を使用してデータ更新をキャプチャして探索する。