このドキュメントでは、Apache Spark 向け Serverless のバッチ ワークロードとインタラクティブ セッションで、 プロジェクト、 バッチ ワークロード、または インタラクティブ セッションの各レベルで データリネージを有効にする方法について説明します。Google Cloud
概要
データリネージは Dataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。
Google Cloud Apache Spark 向け Serverless のワークロードとセッションは、リネージ イベントをキャプチャして、 Dataplex Universal Catalog Data Lineage API に公開します。 Apache Spark 向け Serverless は、 OpenLineage を介して Data Lineage API と統合され、 OpenLineage Spark プラグインを使用します。
リネージ情報には、 リネージグラフ と Data Lineage API を使用して Dataplex Universal Catalog を介してアクセスできます。 詳細については、Dataplex Universal Catalog でリネージグラフを表示するをご覧ください。
対象範囲、機能、制限事項
BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、Apache Spark 向け Serverless ランタイム バージョン 1.2、2.2、2.3、3.0 で実行されるワークロードとセッションで使用できますが、次の例外と制限があります。
- SparkR または Spark ストリーミングのワークロードまたはセッションでは、データリネージは使用できません。
始める前に
コンソールのプロジェクト セレクタのページで、Apache Spark 向け Serverless のワークロードまたはセッションに使用するプロジェクトを選択します。 Google Cloud
データリネージ API を有効にします。
必要なロール
バッチ ワークロードで
デフォルトの Apache Spark 向け Serverless のサービス アカウントを使用している場合、
Dataproc Worker
ロールが付与され、データリネージが有効になります。追加のアクションは不要です。
ただし、バッチ ワークロードでカスタム サービス アカウントを使用して データリネージを有効にする場合は、次の段落で説明するように、カスタム サービス アカウントに必要なロールを付与する必要があります。
Dataproc でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。
-
次のいずれかのロールを1 つ 付与します。
-
Dataproc ワーカー(
roles/dataproc.worker) -
データリネージ編集者(
roles/datalineage.editor) -
データリネージ プロデューサー(
roles/datalineage.producer) -
データリネージ管理者(
roles/datalineage.admin)
-
Dataproc ワーカー(
ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。
必要な権限は、カスタム ロールや他の事前定義 ロールから取得することもできます。
プロジェクト レベルでデータリネージを有効にする
データリネージはプロジェクト レベルで有効にできます。プロジェクト レベルで有効にすると、プロジェクトで実行する後続のすべてのバッチ ワークロードとインタラクティブ セッションで Spark リネージが有効になります。
プロジェクト レベルでデータリネージを有効にする方法
プロジェクト レベルでデータリネージを有効にするには、 次のカスタム プロジェクト メタデータを設定します。
| キー | 値 |
|---|---|
DATAPROC_LINEAGE_ENABLED |
true |
DATAPROC_CLUSTER_SCOPES |
https://www.googleapis.com/auth/cloud-platform |
DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクト レベルでデータリネージを無効にできます。
Spark バッチ ワークロードのデータリネージを有効にする
ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定すると、バッチ ワークロードでデータリネージを有効にできます。
バッチ ワークロードの例
この例では、Spark リネージ
が有効になっているバッチ lineage-example.py ワークロードを送信します。
gcloud dataproc batches submit pyspark lineage-example.py \ --region=REGION \ --deps-bucket=gs://BUCKET \ --properties=spark.dataproc.lineage.enabled=true
lineage-example.py は、一般公開の BigQuery
テーブルからデータを読み取り、出力を既存の BigQuery
データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。
#!/usr/bin/env python
from pyspark.sql import SparkSession
import sys
spark = SparkSession \
.builder \
.appName('LINEAGE_BQ_TO_BQ') \
.getOrCreate()
source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
.option('table', source) \
.load()
words.createOrReplaceTempView('words')
word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.option('writeMethod', 'direct') \
.save()
次のように置き換えます。
REGION:ワークロードを実行するリージョンを選択します。
BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前。
PROJECT_ID、DATASET、TABLE:プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。
リネージグラフは、Dataplex Universal Catalog UI で表示できます。
Spark インタラクティブ セッションのデータリネージを有効にする
セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを true に設定すると、Spark インタラクティブ セッションでデータリネージを有効にできます。
インタラクティブ セッションの例
次の PySpark ノートブック コードは、Spark データリネージが有効になっている Apache Spark 向け Serverless のインタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行する Spark Connect セッションを作成し、出力を 既存の BigQuery データセットの新しいテーブルに書き込みます。
# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session
session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"
# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession
spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()
# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
.option('table', destination_table) \
.save()
次のように置き換えます。
- PROJECT_ID、DATASET、TABLE:プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。
データリネージグラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。
Dataplex Universal Catalog でリネージを表示する
リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データの形式で取得することもできます。 Google Cloud
次のステップ
- データリネージの詳細を学習する。
- インタラクティブ ラボで試す: データリネージと OpenLineage を使用してデータ更新をキャプチャして探索する