Lightning Engine を使用する

Lightning Engine は次世代の Apache Spark パフォーマンスです。パフォーマンス、費用対効果、運用安定性を大幅に向上させるように設計された独自の機能強化が導入されています。

特典

Lightning Engine の利点は次のとおりです。

  • データ オペレーションの高速化: Cloud Storage のインタラクションを最適化することで、パフォーマンスを大幅に向上させ、 コストを削減できます。最適化には、 メタデータの処理、書き込みワークロード、ベクトル化された I/O などがあります。

  • インテリジェントなクエリ実行: スキャンされるデータを動的に削減し、データ処理を最適化し、より効率的な実行プランを生成して、高速で費用対効果の高いクエリを実現する高度なオプティマイザの機能強化を活用します。

  • AI ワークロードと ML ワークロードの効率化: GPU ベースのワークロードのクラスタ起動時間を短縮し、 AI 向けに最適化されたイメージを使用して安全な環境へのデプロイを簡素化します。

Lightning Engine はパフォーマンスを大幅に向上させますが、具体的な効果はワークロードによって異なります。I/O の制約を受けるオペレーションよりも、Spark Dataframe API、Spark Dataset API、Spark SQL クエリを活用するコンピューティング負荷の高いタスクに最適です。

標準エンジンとの比較

Lightning Engine は、Managed Service for Apache Spark クラスタで Spark ジョブを実行するために使用される標準エンジンの代替です。次の表に、Lightning Engine と標準エンジンのアクティベーション プロパティ、ワークロードの適用性、主なメリットを比較します。

機能 標準エンジン Lightning Engine
CLI のフラグ --engine=default またはフラグを設定しない --engine=lightning
最適な用途 汎用ジョブ、開発、テスト 大幅な高速化を必要とするエンタープライズ規模のワークロード
主な特典 ベースライン パフォーマンス 最適化された Cloud Storage のインタラクション、インテリジェントなクエリ実行

要件

Lightning Engine 機能には次の要件が適用されます。

  • イメージ バージョン: Lightning Engine は、 Managed Service for Apache Spark イメージ バージョン 2.3.3 以降で使用する必要があります。
  • サポートされているジョブ: Spark、PySpark、SparkSQL、SparkR がサポートされています。標準エンジンは、Lightning Engine クラスタに送信された他のジョブタイプで実行されます。

ネイティブ クエリ実行

ネイティブ クエリ実行(NQE)は、特定のジョブの高速化を強化する Lightning Engine のオプション コンポーネントです。これは、Apache Gluten と Velox に基づくネイティブ エンジンで、Google ハードウェア向けに最適化されています。Spark クエリの一部を JVM の外部で実行することで、パフォーマンスを向上させます。

NQE は次の場合におすすめします
Spark Dataframe API と Spark Dataset API を活用するコンピューティング負荷の高いタスク、Parquet ファイルと ORC ファイルからデータを読み取る Spark SQL クエリ。出力ファイルの形式はパフォーマンスに影響しません。
NQE は次の場合におすすめしません
Resilient Distributed Datasets(RDD)、ユーザー定義関数(UDF)、ほとんどの Spark Machine Learning(ML)ライブラリ、ストレージ アクセスによる遅延を伴う I/O の制約を受けるオペレーションに大きく依存するジョブ。

要件

ネイティブ クエリ実行機能には次の要件が適用されます。

  • 実行エンジン: NQE は、クラスタの作成時に Lightning エンジンが有効になっているクラスタでのみ使用できます。

  • オペレーティング システム: Debian-12 オペレーティング システムと Ubuntu-22 オペレーティング システムのみがサポートされています。他の OS を使用する NQE 対応ジョブは失敗します。

  • サポートされているジョブ: Spark、PySpark、SparkSQL、SparkR がサポートされています。標準エンジンは、Lightning Engine クラスタに送信された他のジョブタイプで実行されます(NQE なし)。

  • マシンタイプ: Intel または AMD プロセッサを使用するマシン ファミリーのみが サポートされています。ARM プロセッサを使用する NQE 対応ジョブは失敗します(NQE なしで Lightning Engine を利用できます)。

  • GPU とアクセラレータなし: GPU アクセラレータで送信された NQE 対応ジョブは 失敗します(NQE なしで Lightning Engine を利用できます)。

  • データ型: 次のデータ型の入力はサポートされていません。

    • バイト: ORC と Parquet
    • 構造体、配列、マップ: Parquet

料金

料金については、Managed Service for Apache Spark の料金をご覧ください。

Lightning Engine クラスタを作成する

このセクションでは、クラスタに送信された Spark ジョブで Lightning Engine を有効にする Managed Service for Apache Spark クラスタを作成する方法について説明します。

クラスタの作成時にクラスタでネイティブ クエリ実行(NQE)を有効にすることも、クラスタに送信された特定の Spark ジョブに対して後で NQE を有効にすることもできます。

始める前に

  1. アカウントにログインします。 Google Cloud を初めて使用する場合は、 アカウントを作成して、実際のシナリオで Google プロダクトのパフォーマンスを評価してください。 Google Cloud新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that you have the permissions required to complete this guide.

  4. Verify that billing is enabled for your Google Cloud project.

  5. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  6. Google Cloud CLI をインストールします。

  7. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  8. gcloud CLI を初期化するには、次のコマンドを実行します:

    gcloud init
  9. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  10. Verify that you have the permissions required to complete this guide.

  11. Verify that billing is enabled for your Google Cloud project.

  12. Enable the Dataproc API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

  13. Google Cloud CLI をインストールします。

  14. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  15. gcloud CLI を初期化するには、次のコマンドを実行します:

    gcloud init

必要なロール

Managed Service for Apache Spark クラスタを作成し、クラスタにジョブを送信するには、特定の IAM ロールが必要です。組織のポリシーによっては、クラウド プロジェクト オーナーまたはサービス管理者が、これらのロールをユーザーまたはサービス アカウントに付与している場合があります。ロールの付与を確認するには、ロールを付与する必要がありますか?をご覧ください。

ロールの付与については、プロジェクト、 フォルダ、組織へのアクセス権の管理をご覧ください。

ユーザー役割

Managed Service for Apache Spark クラスタの作成に必要な権限を取得するには、次の IAM ロールを付与するよう管理者に依頼してください。

サービス アカウントのロール

Compute Engine のデフォルトのサービス アカウントに、Managed Service for Apache Spark クラスタの作成に必要な 権限を付与するには、 プロジェクトに対する Dataproc ワーカー roles/dataproc.worker)IAM ロールを Compute Engine のデフォルトのサービス アカウントに付与するよう管理者に依頼してください。

クラスタを作成する

次の例では、 コンソール、Google Cloud CLI、Dataproc API、Python 用 Cloud クライアント ライブラリ、または Terraform を使用して Lightning Engine クラスタを作成する方法を示します。 Google Cloud Go、Java、Node.js Cloud クライアント ライブラリを使用して、 Lightning Engine が有効になっているクラスタを作成することもできます。

コンソール

  1. コンソールで、 Google Cloud **Compute Engine で Apache Spark クラスタを作成する** に移動します。詳細については、コンソールで Google Cloud クラスタを作成するをご覧ください。

    Compute Engine で Apache Spark クラスタを作成するに移動

  2. [クラスタを定義する] で、[Lightning Engine を有効にする] [チェックボックス]をオンにします。

  3. 省略可: Spark ジョブでネイティブ実行ランタイムをデフォルトで有効にするには、[ネイティブ実行を有効にする] チェックボックスをオンにします。

  4. 必要に応じて、他のクラスタ設定を構成します。

  5. [作成] をクリックします。

gcloud CLI

  1. Lightning Engine が有効になっているクラスタを作成するには、gcloud dataproc clusters create コマンドを --engine=lightning フラグを指定して実行します。 詳細については、gcloud CLI でクラスタを作成するをご覧ください。

    gcloud dataproc clusters create CLUSTER_NAME \
        --region=REGION \
        --engine=lightning \
        --image-version=2.3
    
  2. 省略可: Spark ジョブでネイティブ実行ランタイムをデフォルトで有効にするには、spark:spark.dataproc.lightningEngine.runtime=native プロパティを含めます。

    gcloud dataproc clusters create CLUSTER_NAME \
        --region=REGION \
        --engine=lightning \
        --image-version=2.3 \
        --properties='spark:spark.dataproc.lightningEngine.runtime=native'
    

API

Lightning Engine が有効になっているクラスタを作成するには、clusters.create リクエストを送信します。詳細については、REST API でクラスタを作成するをご覧ください。

  1. リクエストの本文で、 engine フィールドを LIGHTNINGに設定します。

    {
      "projectId": "PROJECT_ID",
      "clusterName": "CLUSTER_NAME",
      "config": {
        "engine": "LIGHTNING",
        "gceClusterConfig": {},
        "softwareConfig": {
          "imageVersion": "2.3"
        }
      }
    }
    
  2. 省略可: すべてのジョブでネイティブ実行ランタイムをデフォルトで有効にするには、spark:spark.dataproc.lightningEngine.runtime プロパティを含めます。

    {
      "projectId": "PROJECT_ID",
      "clusterName": "CLUSTER_NAME",
      "config": {
        "engine": "LIGHTNING",
        "gceClusterConfig": {},
        "softwareConfig": {
          "imageVersion": "2.3",
          "properties": {
            "spark:spark.dataproc.lightningEngine.runtime": "native"
          }
        }
      }
    }
    

Python

  1. Lightning Engine が有効になっているクラスタを作成するには、create_cluster メソッドを使用し、クラスタ構成の engine フィールドを LIGHTNING に設定します。詳細については、Python でクラスタを作成するをご覧ください。

    from google.cloud import dataproc_v1
    
    def create_lightning_cluster(project_id, region, cluster_name):
        client_options = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        cluster_client = dataproc_v1.ClusterControllerClient(client_options=client_options)
    
        cluster = {
            "project_id": project_id,
            "cluster_name": cluster_name,
            "config": {
                "engine": "LIGHTNING",
                "software_config": {
                    "image_version": "2.3-debian12",
                },
            }
        }
    
        operation = cluster_client.create_cluster(
            project_id=project_id,
            region=region,
            cluster=cluster
        )
        result = operation.result()
        print(f"Cluster created successfully: {result.cluster_name}")
    
  2. 省略可: Spark ジョブでネイティブ実行ランタイムをデフォルトで有効にするには、spark:spark.dataproc.lightningEngine.runtime プロパティを含めます。

    from google.cloud import dataproc_v1
    
    def create_lightning_native_cluster(project_id, region, cluster_name):
        client_options = {"api_endpoint": f"{region}-dataproc.googleapis.com:443"}
        cluster_client = dataproc_v1.ClusterControllerClient(client_options=client_options)
    
        cluster = {
            "project_id": project_id,
            "cluster_name": cluster_name,
            "config": {
                "engine": "LIGHTNING",
                "software_config": {
                    "image_version": "2.3-debian12",
                    "properties": {
                        "spark:spark.dataproc.lightningEngine.runtime": "native"
                    }
                }
            }
        }
    
        operation = cluster_client.create_cluster(
            project_id=project_id,
            region=region,
            cluster=cluster
        )
        result = operation.result()
        print(f"Cluster created successfully: {result.cluster_name}")
    

Terraform

  1. google_dataproc_cluster リソース構成で、engine 引数を LIGHTNING に設定します。
  2. 詳細と高度なオプションについては、公式 Terraform ドキュメントで google_dataproc_cluster リソースをご覧ください。

クラスタ エンジンを確認する

コンソール

  1. コンソールで、[クラスタの詳細] ページに移動します。 Google Cloud
  2. [エンジン] フィールドに Lightning Engine 値が表示されていることを確認します。
  3. ネイティブ クエリ実行を有効にした場合は、[ネイティブ実行] フィールドに native が表示されていることを確認します。

gcloud

  1. エンジンと NQE(有効な場合)を確認するには、gcloud dataproc clusters describe コマンドを実行します。

    gcloud dataproc clusters describe CLUSTER_NAME --project=PROJECT_ID --region=REGION
    
  2. 出力で engine プロパティと lightningEngine.runtime プロパティを確認します。

    clusterName: lightning-engine-cluster
    engine: lightningEngine
    lightningEngine.runtime: native
    

Lightning Engine を使用してジョブを送信する

クラスタの作成時に Lightning Engine を有効にした場合、Spark ジョブをクラスタに送信すると、ジョブで Lightning Engine がデフォルトで有効になります。

ジョブのネイティブ クエリ実行を有効にする

Lightning Engine クラスタの作成時にネイティブ クエリ実行(NQE)を有効にした場合、特定のジョブで NQE を無効にしない限り、すべての Spark ジョブが NQE を有効にして実行されます。

Lightning Engine クラスタの作成時に NQE を有効にしなかった場合は、次の例に示すように、ジョブの送信時に NQE を有効にできます。

gcloud

Spark ジョブの送信時にネイティブ クエリ実行を有効にするには、Spark ジョブを 送信するときに、spark.dataproc.lightningEngine.runtime=native プロパティを含めます。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties=spark.dataproc.lightningEngine.runtime=native \
    -- ...

API

Spark ジョブの送信時にネイティブ クエリ実行を有効にするには、リクエストに spark.dataproc.lightningEngine.runtime プロパティを含めます。

{
  "job":{
    "placement":{
      "clusterName": ...
    },
    "sparkJob":{
      "mainClass": ...,
      "properties":{
         "spark.dataproc.lightningEngine.runtime":"native"
      }
    }
  }
}

ジョブのネイティブ クエリ実行を無効にする

Lightning Engine クラスタの作成時にネイティブ クエリ実行(NQE)を有効にした場合、特定のジョブで NQE を無効にしない限り、すべての Spark ジョブが NQE を有効にして実行されます。

次の例に示すように、ジョブの送信時に特定の Spark ジョブの NQE を無効にできます。

gcloud

Spark ジョブを Lightning Engine クラスタに送信するときにネイティブ クエリ実行を無効にするには、Spark ジョブを送信し、 Lightning Engine クラスタに送信し、 spark.dataproc.lightningEngine.runtime=default プロパティを含めます。

gcloud dataproc jobs submit spark \
    --cluster=CLUSTER_NAME \
    --region=REGION \
    --properties=spark.dataproc.lightningEngine.runtime=default \
    -- ...

API

Spark ジョブを Lightning Engine クラスタに送信するときにネイティブ クエリ実行を無効にするには、Spark ジョブを送信し、 Lightning Engine クラスタに送信し、 spark.dataproc.lightningEngine.runtime=default プロパティを含めます。

{
  "job":{
    "placement":{
      "clusterName": ...
    },
    "sparkJob":{
      "mainClass": ...,
      "properties":{
         "spark.dataproc.lightningEngine.runtime":"default"
      }
    }
  }
}

ジョブのネイティブ クエリ実行を確認する

Lightning Engine クラスタにジョブを送信したら、ジョブでネイティブ クエリ実行が有効になっていることを確認できます。

コンソール

  1. コンソールで、[**ジョブの詳細**] ページに移動します。 Google Cloud
  2. [ネイティブ実行] フィールドに native が表示されていることを確認します。

gcloud

  1. gcloud dataproc jobs describe コマンドを実行します。

    gcloud dataproc jobs describe JOB_ID --project=PROJECT_ID --region=REGION
    
  2. [プロパティ] セクションで lightningEngine.runtime の出力を確認します。

    lightningEngine.runtime: native
    

構成パラメータ

次の表に、Lightning Engine とネイティブ クエリ実行の主な構成パラメータを示します。

パラメータ名 説明 適用可能なエンジン デフォルト値 デフォルト値(Lightning Engine) ユーザーによるオーバーライド可能(ジョブレベル) 範囲
--engine クラスタの作成時にエンジンを選択するクラスタレベルの設定。 クラスタ全体 default lightning いいえ クラスタ
spark:spark.dataproc.lightningEngine.runtime クラスタの作成時に Lightning エンジンのランタイムを選択するクラスタレベルの設定。 Lightning のみ default default いいえ クラスタ
spark.dataproc.lightningEngine.runtime Lightning Engine 内でネイティブ クエリ実行(NQE)を有効または無効にします。 Lightning のみ default default はい。native または default に設定できます。 ジョブ

制限事項

次のシナリオでネイティブ クエリ実行を有効にすると、例外、Spark の非互換性、ワークロードのデフォルトの Spark エンジンへのフォールバックが発生する可能性があります。

フォールバック

次のシナリオでネイティブ クエリ実行を行うと、ワークロードが Spark 実行エンジンにフォールバックする可能性があります。

  • ANSI: ANSI モードが有効になっている場合、実行は Spark にフォールバックします。
  • 大文字と小文字を区別するモード: ネイティブ クエリ実行では、Spark のデフォルトの大文字と小文字を区別しないモードのみがサポートされています。大文字と小文字を区別するモードが有効になっている場合、誤った結果が生じる可能性があります。
  • 分割テーブル スキャン: ネイティブ クエリ実行では、パスにパーティション情報が含まれている場合にのみ、分割 テーブル スキャンがサポートされます。それ以外の場合、ワークロードは Spark 実行エンジンにフォールバックします。

非互換性のある動作

次のケースでネイティブ クエリ実行を使用すると、非互換性のある動作や誤った結果が生じる可能性があります。

  • JSON 関数: ネイティブ クエリ実行では、単一引用符ではなく 二重引用符で囲まれた文字列がサポートされます。単一引用符を使用すると、誤った結果が生じます。get_json_object 関数でパスに * を使用すると、NULL が返されます。
  • Parquet 読み取り構成:
    • ネイティブ クエリ実行では、spark.files.ignoreCorruptFilestrue に設定されていても、デフォルトの false 値に設定されているとみなされます。
    • ネイティブ クエリ実行では spark.sql.parquet.datetimeRebaseModeInRead は無視され、Parquet ファイルの内容のみが返されます。従来のハイブリッド カレンダーとプロレプティック グレゴリオ暦の違いは考慮されません。Spark の結果が異なる場合があります。
  • NaN: 対象外です。たとえば、数値比較で NaN を使用すると、予期しない結果が生じる可能性があります。
  • Spark の列指向読み取り: Spark の列指向ベクトルがネイティブ クエリ実行と互換性がないため、致命的なエラーが発生する可能性があります。
  • スピル: シャッフル パーティションを大きな数に設定すると、 ディスクへのスピル機能によって OutOfMemoryException がトリガーされる可能性があります。この場合は、パーティションの数を減らすことで、この例外を解消できます。

次のステップ

  • Lightning Engine で Spark バッチ ワークロードとセッションを高速化する。