Lightning Engine で Google Cloud Apache Spark 向け Serverless を高速化する

このドキュメントでは、Lightning Engine を有効にして、Apache Spark 向け Serverless のバッチ ワークロードとインタラクティブ セッションを高速化する方法について説明します。

概要

Lightning Engine は、マルチレイヤの最適化エンジンを搭載した高性能のクエリ アクセラレータです。このエンジンは、クエリや実行の最適化などの従来の最適化手法と、ファイル システムレイヤやデータアクセス コネクタでのキュレートされた最適化の両方を実行します。

次の図に示すように、Lightning Engine は TPC-H のようなワークロード(10 TB データセット サイズ)で Spark クエリの実行パフォーマンスを高速化します。

詳細については、Lightning Engine のご紹介 - Apache Spark を次世代のパフォーマンスにをご覧ください。

Lightning Engine の利用状況

  • Lightning Engine は、一般提供されている Apache Spark 向け Serverless のサポート対象ランタイム(現在のランタイムは 1.22.22.3。Spark ランタイム 3.0 では使用できません)で使用できます。
  • Lightning Engine は、Apache Spark 向け Serverless のプレミアム料金ティアでのみ使用できます。
    • バッチ ワークロード: Lightning Engine は、プレミアム ティアのバッチ ワークロードで自動的に有効になります。このため、ご対応は不要です。
    • インタラクティブ セッション: Lightning Engine は、インタラクティブ セッションではデフォルトで有効になっていません。有効にするには、Lightning Engine を有効にするをご覧ください。
    • セッション テンプレート: セッション テンプレートでは、Lightning Engine はデフォルトで有効になっていません。有効にするには、Lightning Engine を有効にするをご覧ください。

Lightning Engine を有効にする

以降のセクションでは、Apache Spark 用サーバーレスのバッチ ワークロード、セッション テンプレート、インタラクティブ セッションで Lightning エンジンを有効にする方法について説明します。

バッチ ワークロード

バッチ ワークロードで Lightning Engine を有効にする

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、バッチ ワークロードで Lightning Engine を有効にできます。

コンソール

Google Cloud コンソールを使用して、バッチ ワークロードで Lightning Engine を有効にします。

  1. Google Cloud コンソールで次の操作を行います。

    1. Dataproc バッチに移動します。
    2. [作成] をクリックして、[バッチを作成] ページを開きます。
  2. 次のフィールドを選択して入力します。

    • コンテナ:
    • Tier の構成:

      • [Premium] を選択します。これにより、「LIGHTNING ENGINE を有効にして Spark のパフォーマンスを向上させる」が自動的に有効になり、チェックされます。

      プレミアム ティアを選択すると、[ドライバ コンピューティング ティア] と [エグゼキュータ コンピューティング ティア] が Premium に設定されます。これらのプレミアム ティアのコンピューティング設定は自動的に設定され、3.0 より前のランタイムを使用するバッチではオーバーライドできません。

      ドライバ ディスクのティアエグゼキュータ ディスクのティアPremium に構成するか、デフォルトの Standard ティア値のままにします。プレミアム ディスク階層を選択した場合は、ディスクサイズを選択する必要があります。詳細については、リソース割り当てプロパティをご覧ください。

    • プロパティ: 省略可: ネイティブ クエリ実行ランタイムを選択する場合は、次の Key(プロパティ名)と Value のペアを入力します。

      キー
      spark.dataproc.lightningEngine.runtime 先住民

  3. 他のバッチ ワークロードの設定を入力、選択、確認します。Spark バッチ ワークロードを送信するをご覧ください。

  4. [送信] をクリックして、Spark バッチ ワークロードを実行します。

gcloud

次の gcloud CLI gcloud dataproc batches submit spark コマンドフラグを設定して、バッチ ワークロードで Lightning Engine を有効にします。

gcloud dataproc batches submit spark \
    --project=PROJECT_ID \
    --region=REGION \
    --properties=dataproc.tier=premium \
    OTHER_FLAGS_AS_NEEDED

注:

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
  • --properties=dataproc.tier=premium。プレミアム ティアを設定すると、バッチ ワークロードに次のプロパティが自動的に設定されます。

    • spark.dataproc.engine=lightningEngine は、バッチ ワークロードに Lightning Engine を選択します。
    • spark.dataproc.driver.compute.tierspark.dataproc.executor.compute.tierpremium に設定されます(リソース割り当てプロパティをご覧ください)。3.0 より前のランタイムを使用するバッチでは、プレミアム ティアのコンピューティング設定を自動的に設定することはできません。
  • その他のプロパティ

    • ネイティブ クエリエンジン: spark.dataproc.lightningEngine.runtime=native ネイティブ クエリ実行ランタイムを選択する場合は、このプロパティを追加します。

    • ディスクの階層とサイズ: デフォルトでは、ドライバとエグゼキュータのディスクサイズは standard の階層とサイズに設定されています。プロパティを追加して、premium ディスクの階層とサイズ(375 GiB の倍数)を選択できます。
      詳細については、リソース割り当てプロパティをご覧ください。

  • OTHER_FLAGS_AS_NEEDED: Spark バッチ ワークロードを送信するをご覧ください。

API

バッチ ワークロードで Lightning Engine を有効にするには、batches.create リクエストの一部として、RuntimeConfig.properties に "dataproc.tier":"premium" を追加します。プレミアム ティアを設定すると、バッチ ワークロードに次のプロパティが自動的に設定されます。

  • spark.dataproc.engine=lightningEngine は、バッチ ワークロードに Lightning Engine を選択します。
  • spark.dataproc.driver.compute.tierspark.dataproc.executor.compute.tierpremium に設定されます(リソース割り当てプロパティをご覧ください)。3.0 より前のランタイムを使用するバッチでは、プレミアム ティアのコンピューティング設定を自動的に設定することはできません。

その他 RuntimeConfig.properties:

  • ネイティブ クエリエンジン: spark.dataproc.lightningEngine.runtime:nativeネイティブ クエリ実行ランタイムを選択する場合は、このプロパティを追加します。

  • ディスクの階層とサイズ: デフォルトでは、ドライバとエグゼキュータのディスクサイズは standard の階層とサイズに設定されています。プロパティを追加して、premium ティアとサイズ(375 GiB の倍数)を選択できます。
    詳細については、リソース割り当てプロパティをご覧ください。

他のバッチ ワークロード API フィールドを設定するには、Spark バッチ ワークロードを送信するをご覧ください。

セッション テンプレート

セッション テンプレートで Lightning Engine を有効にする

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、Jupyter セッションまたは Spark Connect セッションのセッション テンプレートで Lightning Engine を有効にできます。

コンソール

Google Cloud コンソールを使用して、バッチ ワークロードで Lightning Engine を有効にします。

  1. Google Cloud コンソールで次の操作を行います。

    1. Dataproc セッション テンプレートに移動します。
    2. [作成] をクリックして、[セッション テンプレートの作成] ページを開きます。
  2. 次のフィールドを選択して入力します。

    • セッション テンプレート情報:
      • [Lightning Engine を有効にして Spark のパフォーマンスを向上させる] を選択します。
    • 実行構成:
    • プロパティ: 次の Key(プロパティ名)と Value のペアを入力して、プレミアム ティアを選択します。

      キー
      dataproc.tier プレミアム
      spark.dataproc.engine lightningEngine

      省略可: 次の Key(プロパティ名)と Value のペアを入力して、ネイティブ クエリ実行ランタイムを選択します。

      キー
      spark.dataproc.lightningEngine.runtime native

  3. 他のセッション テンプレートの設定を入力、選択、または確認します。セッション テンプレートを作成するをご覧ください。

  4. [送信] をクリックしてセッション テンプレートを作成します。

gcloud

gcloud CLI を使用して、Apache Spark 用 Serverless セッション テンプレートを直接作成することはできません。代わりに、gcloud beta dataproc session-templates import コマンドを使用して既存のセッション テンプレートをインポートし、インポートしたテンプレートを編集して Lightning Engine と必要に応じてネイティブ クエリ ランタイムを有効にしてから、gcloud beta dataproc session-templates export コマンドを使用して編集したテンプレートをエクスポートします。

API

セッション テンプレートで Lightning Engine を有効にするには、sessionTemplates.create リクエストの一部として、RuntimeConfig.properties に「dataproc.tier」:「premium」と「spark.dataproc.engine」:「lightningEngine」を追加します。

その他 RuntimeConfig.properties:

  • ネイティブ クエリエンジン: spark.dataproc.lightningEngine.runtime:native: このプロパティを RuntimeConfig.properties に追加して、ネイティブ クエリ実行ランタイムを選択します。

他のセッション テンプレート API フィールドを設定するには、セッション テンプレートを作成するをご覧ください。

インタラクティブ セッション

インタラクティブ セッションで Lightning Engine を有効にする

Google Cloud CLI または Dataproc API を使用して、Apache Spark 用 Serverless のインタラクティブ セッションで Lightning Engine を有効にできます。BigQuery Studio ノートブックのインタラクティブ セッションで Lightning Engine を有効にすることもできます。

gcloud

次の gcloud CLI gcloud beta dataproc sessions create spark コマンドフラグを設定して、インタラクティブ セッションで Lightning Engine を有効にします。

gcloud beta dataproc sessions create spark \
    --project=PROJECT_ID \
    --location=REGION \
    --properties=dataproc.tier=premium,spark.dataproc.engine=lightningEngine \
    OTHER_FLAGS_AS_NEEDED

注:

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。プロジェクト ID は、 Google Cloud コンソールのダッシュボードの [プロジェクト情報] セクションに表示されます。
  • REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
  • --properties=dataproc.tier=premium,spark.dataproc.engine=lightningEngine。これらのプロパティにより、セッションで Lightning Engine が有効になります。

  • その他のプロパティ:

    • ネイティブ クエリエンジン: spark.dataproc.lightningEngine.runtime=native: このプロパティを追加して、ネイティブ クエリ実行ランタイムを選択します。
  • OTHER_FLAGS_AS_NEEDED: インタラクティブ セッションを作成するをご覧ください。

API

セッションで Lightning Engine を有効にするには、sessions.create リクエストの一部として、RuntimeConfig.properties に「dataproc.tier」:「premium」と「spark.dataproc.engine」:「lightningEngine」を追加します。

その他 RuntimeConfig.properties:

* ネイティブ クエリエンジン: spark.dataproc.lightningEngine.runtime:native: ネイティブ クエリ実行ランタイムを選択する場合は、このプロパティを RuntimeConfig.properties に追加します。

他のセッション テンプレート API フィールドを設定するには、インタラクティブ セッションを作成するをご覧ください。

BigQuery ノートブック

Lightning Engine は、BigQuery Studio PySpark ノートブックでセッションを作成するときに有効にできます。

from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session()

# Enable Lightning Engine.
session.runtime_config.properties["dataproc.tier"] = "premium"
session.runtime_config.properties["spark.dataproc.engine"] = "lightningEngine"

# Enable THE Native Query Execution runtime.
session.runtime_config.properties["spark.dataproc.lightningEngine.runtime"] = "native"

# Create the Spark session.
spark = (
   DataprocSparkSession.builder
     .appName("APP_NAME")
     .dataprocSessionConfig(session)
     .getOrCreate())

# Add Spark application code here:

Lightning Engine の設定を確認する

Google Cloud コンソール、Google Cloud CLI、または Dataproc API を使用して、バッチ ワークロード、セッション テンプレート、またはインタラクティブ セッションの Lightning Engine 設定を確認できます。

バッチ ワークロード

  • バッチの階層premium に設定され、エンジンLightning Engine に設定されていることを確認するには:

    • Google Cloud コンソール: [バッチ] ページで、バッチの [階層] 列と [エンジン] 列を確認します。[バッチ ID] をクリックして、バッチの詳細ページでこれらの設定を表示することもできます。
    • gcloud CLI: gcloud dataproc batches describe コマンドを実行します。
    • API: batches.get リクエストを発行します。

セッション テンプレート

  • セッション テンプレートの engineLightning Engine に設定されていることを確認するには:

    • Google Cloud コンソール: [セッション テンプレート] ページの [エンジン] 列で、テンプレートを確認します。セッション テンプレートの [名前] をクリックして、セッション テンプレートの詳細ページでこの設定を表示することもできます。
    • gcloud CLI: gcloud beta dataproc session-templates describe コマンドを実行します。
    • API: sessionTemplates.get リクエストを発行します。

インタラクティブ セッション

  • インタラクティブ セッションの場合、エンジンLightning Engine に設定されます。

    • Google Cloud コンソール: [インタラクティブ セッション] ページで、テンプレートの [エンジン] 列を確認します。[インタラクティブ セッション ID] をクリックすると、セッション テンプレートの詳細ページでこの設定を表示することもできます。
    • gcloud CLI: gcloud beta dataproc sessions describe コマンドを実行します。
    • API: sessions.get リクエストを発行します。

ネイティブ クエリの実行

ネイティブ クエリ実行(NQE)は、Google ハードウェア向けに設計された Apache GlutenVelox に基づくネイティブな実装によってパフォーマンスを向上させる Lightning Engine のオプション機能です。

ネイティブ クエリ実行ランタイムには、既存の Spark 構成を変更することなく、オフヒープ メモリとオンヒープ メモリを動的に切り替えるための統合メモリ管理が含まれています。NQE は、演算子、関数、Spark データ型への対応を拡張しているほか、ネイティブ エンジンを活用してオペレーションのプッシュダウンを最適化する機会を自動的に特定するインテリジェンスも備えています。

ネイティブ クエリ実行ワークロードを特定する

ネイティブ クエリ実行は、次のようなシナリオで使用します。

  • Parquet ファイルと ORC ファイルからデータを読み取る Spark Dataframe API、Spark Dataset API、Spark SQL クエリ。出力ファイル形式は、ネイティブ クエリ実行のパフォーマンスに影響しません。

  • ネイティブ クエリ実行の認定ツールで推奨されるワークロード。

次のデータ型の入力があるワークロードでは、ネイティブ クエリの実行はおすすめしません。

  • バイト: ORC と Parquet
  • タイムスタンプ: ORC
  • 構造体、配列、マップ: Parquet

ネイティブ クエリ実行の制限事項

次のシナリオでネイティブ クエリ実行を有効にすると、例外、Spark の非互換性、ワークロードのデフォルトの Spark エンジンへのフォールバックが発生する可能性があります。

フォールバック

次の実行でのネイティブ クエリの実行により、ワークロードが Spark 実行エンジンにフォールバックし、回帰または失敗が発生する可能性があります。

  • ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。

  • 大文字と小文字を区別するモード: ネイティブ クエリ実行は、Spark のデフォルトの大文字と小文字を区別しないモードのみをサポートします。大文字と小文字を区別するモードが有効になっている場合、正しくない結果が返されることがあります。

  • パーティション分割テーブルのスキャン: ネイティブ クエリ実行は、パスにパーティション情報が含まれている場合にのみパーティション分割テーブルのスキャンをサポートします。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。

互換性のない動作

次の場合は、ネイティブ クエリ実行を使用すると、互換性のない動作や誤った結果が生じる可能性があります。

  • JSON 関数: ネイティブ クエリ実行では、単一引用符ではなく二重引用符で囲まれた文字列がサポートされます。単一引用符を使用すると、正しくない結果が返されます。get_json_object 関数でパスに「*」を使用すると、NULL が返されます。

  • Parquet 読み取り構成:

    • ネイティブ クエリの実行では、spark.files.ignoreCorruptFilestrue に設定されている場合でも、デフォルトの false 値に設定されているとみなされます。
    • ネイティブ クエリ実行は spark.sql.parquet.datetimeRebaseModeInRead を無視し、Parquet ファイルの内容のみを返します。以前のハイブリッド(ユリウス暦とグレゴリオ暦)カレンダーと先発グレゴリオ暦カレンダーの違いは考慮されません。Spark の結果は異なる場合があります。
  • NaN: サポートされていません。たとえば、数値の比較で NaN を使用すると、予期しない結果が生じる可能性があります。

  • Spark カラム型読み取り: Spark カラム型ベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。

  • スピル: シャッフル パーティションが大きな数に設定されている場合、ディスクへのスピル機能が OutOfMemoryException をトリガーすることがあります。この例外が発生した場合は、パーティションの数を減らすことで例外を解消できます。