Apache Spark 用サーバーレスでデータ リネージを使用する

このドキュメントでは、Apache Spark 向け Serverless のバッチ ワークロードとインタラクティブ セッションで、 プロジェクトバッチ ワークロード、または インタラクティブ セッションの各レベルで データリネージを有効にする方法について説明します。Google Cloud

概要

データリネージは Dataplex Universal Catalog の機能で、システム内でのデータの移動(データの送信元、データの通過先、データに適用される変換)を追跡できます。

Google Cloud Apache Spark 向け Serverless のワークロードとセッションは、リネージ イベントをキャプチャして、 Dataplex Universal Catalog Data Lineage API に公開します。 Apache Spark 向け Serverless は、 OpenLineage を介して Data Lineage API と統合され、 OpenLineage Spark プラグインを使用します。

リネージ情報には、 リネージグラフData Lineage API を使用して Dataplex Universal Catalog を介してアクセスできます。 詳細については、Dataplex Universal Catalog でリネージグラフを表示するをご覧ください。

対象範囲、機能、制限事項

BigQuery と Cloud Storage のデータソースをサポートするデータリネージは、Apache Spark 向け Serverless ランタイム バージョン 1.22.22.33.0 で実行されるワークロードとセッションで使用できますが、次の例外と制限があります。

  • SparkR または Spark ストリーミングのワークロードまたはセッションでは、データリネージは使用できません。

始める前に

  1. コンソールのプロジェクト セレクタのページで、Apache Spark 向け Serverless のワークロードまたはセッションに使用するプロジェクトを選択します。 Google Cloud

    プロジェクト セレクタに移動

  2. データリネージ API を有効にします。

    API を有効にする

必要なロール

バッチ ワークロードで デフォルトの Apache Spark 向け Serverless のサービス アカウントを使用している場合、 Dataproc Worker ロールが付与され、データリネージが有効になります。追加のアクションは不要です。

ただし、バッチ ワークロードでカスタム サービス アカウントを使用して データリネージを有効にする場合は、次の段落で説明するように、カスタム サービス アカウントに必要なロールを付与する必要があります。

Dataproc でデータリネージを使用するために必要な権限を取得するには、バッチ ワークロードのカスタム サービス アカウントに対して次の IAM ロールを付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織に対するアクセス権の管理をご覧ください。

必要な権限は、カスタム ロールや他の事前定義 ロールから取得することもできます。

プロジェクト レベルでデータリネージを有効にする

データリネージはプロジェクト レベルで有効にできます。プロジェクト レベルで有効にすると、プロジェクトで実行する後続のすべてのバッチ ワークロードとインタラクティブ セッションで Spark リネージが有効になります。

プロジェクト レベルでデータリネージを有効にする方法

プロジェクト レベルでデータリネージを有効にするには、 次のカスタム プロジェクト メタデータを設定します

キー
DATAPROC_LINEAGE_ENABLED true
DATAPROC_CLUSTER_SCOPES https://www.googleapis.com/auth/cloud-platform

DATAPROC_LINEAGE_ENABLED メタデータを false に設定すると、プロジェクト レベルでデータリネージを無効にできます。

Spark バッチ ワークロードのデータリネージを有効にする

ワークロードを送信するときに spark.dataproc.lineage.enabled プロパティを true に設定すると、バッチ ワークロードでデータリネージを有効にできます。

バッチ ワークロードの例

この例では、Spark リネージ が有効になっているバッチ lineage-example.py ワークロードを送信します。

gcloud dataproc batches submit pyspark lineage-example.py \
    --region=REGION \
    --deps-bucket=gs://BUCKET \
    --properties=spark.dataproc.lineage.enabled=true

lineage-example.py は、一般公開の BigQuery テーブルからデータを読み取り、出力を既存の BigQuery データセットの新しいテーブルに書き込みます。一時ストレージには Cloud Storage バケットを使用します。

#!/usr/bin/env python

from pyspark.sql import SparkSession
import sys

spark = SparkSession \
  .builder \
  .appName('LINEAGE_BQ_TO_BQ') \
  .getOrCreate()

source = 'bigquery-public-data:samples.shakespeare'
words = spark.read.format('bigquery') \
  .option('table', source) \
  .load()
words.createOrReplaceTempView('words')

word_count = spark.sql('SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .option('writeMethod', 'direct') \
  .save()

次のように置き換えます。

  • REGION:ワークロードを実行するリージョンを選択します。

  • BUCKET: 依存関係を保存する既存の Cloud Storage バケットの名前。

  • PROJECT_IDDATASETTABLE:プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。

リネージグラフは、Dataplex Universal Catalog UI で表示できます。

Spark リネージグラフ

Spark インタラクティブ セッションのデータリネージを有効にする

セッションまたはセッション テンプレートを作成するときに spark.dataproc.lineage.enabled プロパティを true に設定すると、Spark インタラクティブ セッションでデータリネージを有効にできます。

インタラクティブ セッションの例

次の PySpark ノートブック コードは、Spark データリネージが有効になっている Apache Spark 向け Serverless のインタラクティブ セッションを構成します。次に、一般公開の BigQuery Shakespeare データセットでワードカウント クエリを実行する Spark Connect セッションを作成し、出力を 既存の BigQuery データセットの新しいテーブルに書き込みます。

# Configure the Dataproc Serverless interactive session
# to enable Spark data lineage.
from google.cloud.dataproc_v1 import Session

session = Session()
session.runtime_config.properties["spark.dataproc.lineage.enabled"] = "true"

# Create the Spark Connect session.
from google.cloud.dataproc_spark_connect import DataprocSparkSession

spark = DataprocSparkSession.builder.dataprocSessionConfig(session).getOrCreate()

# Run a wordcount query on the public BigQuery Shakespeare dataset.
source = "bigquery-public-data:samples.shakespeare"
words = spark.read.format("bigquery").option("table", source).load()
words.createOrReplaceTempView('words')
word_count = spark.sql(
           'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')

# Output the results to a BigQuery destination table.
destination_table = 'PROJECT_ID:DATASET.TABLE'
word_count.write.format('bigquery') \
  .option('table', destination_table) \
  .save()

次のように置き換えます。

  • PROJECT_IDDATASETTABLE:プロジェクト ID、既存の BigQuery データセットの名前、データセットに作成する新しいテーブルの名前(テーブルが存在しない場合)を挿入します。

データリネージグラフを表示するには、BigQuery の [エクスプローラ] ページのナビゲーション パネルに表示されている宛先テーブル名をクリックし、テーブルの詳細パネルで [リネージ] タブを選択します。

Spark リネージグラフ

Dataplex Universal Catalog でリネージを表示する

リネージグラフには、プロジェクト リソースとそれらを作成したプロセスの関係が表示されます。コンソールでデータリネージ情報を表示することも、Data Lineage API から JSON データの形式で取得することもできます。 Google Cloud

次のステップ