GPU を使用する

Managed Service for Apache Spark バッチ ワークロードに GPU アクセラレータを接続すると、次の結果が得られます。

  • 大規模なデータ分析ワークロードの処理を高速化します。

  • GPU ML ライブラリを使用して、大規模なデータセットでのモデル トレーニングを高速化します。

  • 動画や自然言語処理などの高度なデータ分析を実行します。

サポートされている Managed Service for Apache Spark Spark ランタイムはすべて、各ワークロード ノードに Spark RAPIDS ライブラリを追加します。Managed Service for Apache Spark Spark ランタイム バージョン 1.1 では、ワークロード ノードに XGBoost ライブラリも追加されます。これらのライブラリは、GPU 高速化ワークロードで使用できる強力なデータ変換ツールと機械学習ツールを提供します。

GPU のメリット

Managed Service for Apache Spark の Spark ワークロードで GPU を使用するメリットは次のとおりです。

  • パフォーマンスの向上: GPU アクセラレーションにより、特に ML やディープ ラーニング、グラフ処理、複雑な分析などのコンピューティング負荷の高いタスクで、Spark ワークロードのパフォーマンスを大幅に向上させることができます。

  • モデル トレーニングの高速化: ML タスクの場合、GPU を接続すると、モデルのトレーニングに必要な時間を大幅に短縮できます。これにより、データ サイエンティストやエンジニアは迅速に反復処理と試験運用を行うことができます。

  • スケーラビリティ: 複雑化する処理ニーズに対応するため、GPU ノードを追加したり、ノードに強力な GPU を追加したりできます。

  • 費用対効果: GPU には初期投資が必要ですが、処理時間の短縮とリソース使用率の向上により、長期的にコストを削減できます。

  • データ分析の強化: GPU アクセラレーションにより、大規模なデータセットで画像や動画の分析、自然言語処理などの高度な分析を実行できます。

  • 製品の改善: 処理の高速化により、意思決定の迅速化とアプリケーションの応答性の向上を実現します。

制限事項と考慮事項

料金

GPU アクセラレータは、プレミアム料金階層で利用できます。アクセラレータの料金については、Managed Service for Apache Spark の料金をご覧ください。

始める前に

GPU アクセラレータがアタッチされたサーバーレス Batch ワークロードを作成する前に、次の操作を行います。

  1. Google Cloud アカウントにログインします。 Google Cloudを初めて使用する場合は、 アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  5. Google Cloud CLI をインストールします。

  6. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  7. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  9. Verify that billing is enabled for your Google Cloud project.

  10. Enable the Dataproc, Compute Engine, and Cloud Storage APIs.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the APIs

  11. Google Cloud CLI をインストールします。

  12. 外部 ID プロバイダ(IdP)を使用している場合は、まず連携 ID を使用して gcloud CLI にログインする必要があります。

  13. gcloud CLI を初期化するには、次のコマンドを実行します。

    gcloud init
  14. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  15. [ 作成] をクリックします。
  16. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
    1. [始める] セクションで、次の操作を行います。
      • バケット名の要件を満たす、グローバルに一意の名前を入力します。
      • バケットラベルを追加するには、[ラベル] セクション()を開き、[ ラベルを追加] をクリックして、ラベルの keyvalue を指定します。
    2. [データの保存場所の選択] セクションで、次の操作を行います。
      1. ロケーション タイプを選択してください。
      2. [ロケーション タイプ] プルダウン メニューから、バケットのデータが永続的に保存されるロケーションを選択します。
      3. クロスバケット レプリケーションを設定するには、[Storage Transfer Service 経由でクロスバケット レプリケーションを追加する] を選択し、次の手順を実施します。

        クロスバケット レプリケーションを設定する

        1. [バケット] メニューで、バケットを選択します。
        2. [レプリケーション設定] セクションで、[構成] をクリックして、レプリケーション ジョブの設定を構成します。

          [クロスバケット レプリケーションを構成する] ペインが表示されます。

          • オブジェクト名の接頭辞で複製するオブジェクトをフィルタするには、オブジェクトを追加または除外する接頭辞を入力し、 [接頭辞を追加] をクリックします。
          • 複製されたオブジェクトのストレージ クラスを設定するには、[ストレージ クラス] メニューからストレージ クラスを選択します。この手順をスキップすると、複製されたオブジェクトはデフォルトで宛先バケットのストレージ クラスを使用します。
          • [完了] をクリックします。
    3. [データの保存場所を選択する] セクションで、次の操作を行います。
      1. バケットのデフォルトのストレージ クラスを選択するか、バケットデータのストレージ クラスを自動的に管理する Autoclass を選択します。
      2. 階層名前空間を有効にするには、[データ量が多いワークロード向けにストレージを最適化] セクションで、[このバケットで階層的な名前空間を有効にする] を選択します。
    4. [オブジェクトへのアクセスを制御する方法を選択する] セクションで、バケットに公開アクセスの防止を適用するかどうかを選択し、バケットのオブジェクトに使用するアクセス制御方法を選択します。
    5. [オブジェクト データを保護する方法を選択する] セクションで、次の操作を行います。
      • [データ保護] で、バケットに設定するオプションを選択します。
        • 削除(復元可能)を有効にするには、[削除(復元可能)ポリシー(データ復元用)] チェックボックスをオンにして、削除後にオブジェクトを保持する日数を指定します。
        • オブジェクトのバージョニングを設定するには、[オブジェクトのバージョニング(バージョン管理用)] チェックボックスをオンにして、オブジェクトごとの最大バージョン数と、非現行バージョンの有効期限が切れるまでの日数を指定します。
        • オブジェクトとバケットで保持ポリシーを有効にするには、[保持(コンプライアンス用)] チェックボックスをオンにして、次の操作を行います。
          • オブジェクト保持ロックを有効にするには、[オブジェクト保持を有効にする] チェックボックスをオンにします。
          • バケットロックを有効にするには、[バケット保持ポリシーを設定] チェックボックスをオンにして、保持期間の単位と期間を選択します。
      • オブジェクト データの暗号化方法を選択するには、[データ暗号化] セクション()を開き、データ暗号化方法を選択します。
  17. [作成] をクリックします。

GPU アクセラレータを使用してサーバーレス バッチ ワークロードを作成する

NVIDIA L4 GPU を使用する Managed Service for Apache Spark バッチ ワークロードを送信して、並列化された PySpark タスクを実行します。gcloud CLI を使用して次の操作を行います。

  1. [展開する] をクリックし、テキストまたはコードエディタを使用して、一覧表示された PySpark コードを作成し、ローカルマシンの test-py-spark-gpu.py ファイルに保存します。

    #!/usr/bin/env python
    
    """S8s Accelerators Example."""
    
    import subprocess
    from typing import Any
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.types import IntegerType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StructType
    
    spark = SparkSession.builder.appName("joindemo").getOrCreate()
    
    
    def get_num_gpus(_: Any) -> int:
      """Returns the number of GPUs."""
      p_nvidia_smi = subprocess.Popen(
          ["nvidia-smi", "-L"], stdin=None, stdout=subprocess.PIPE
      )
      p_wc = subprocess.Popen(
          ["wc", "-l"],
          stdin=p_nvidia_smi.stdout,
          stdout=subprocess.PIPE,
          stderr=subprocess.PIPE,
          universal_newlines=True,
      )
      [out, _] = p_wc.communicate()
      return int(out)
    
    
    num_workers = 5
    result = (
        spark.sparkContext.range(0, num_workers, 1, num_workers)
        .map(get_num_gpus)
        .collect()
    )
    num_gpus = sum(result)
    print(f"Total accelerators: {num_gpus}")
    
    # Run the join example
    schema = StructType([StructField("value", IntegerType(), True)])
    df = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    df2 = (
        spark.sparkContext.parallelize(range(1, 10000001), 6)
        .map(lambda x: (x,))
        .toDF(schema)
    )
    joined_df = (
        df.select(col("value").alias("a"))
        .join(df2.select(col("value").alias("b")), col("a") == col("b"))
        .explain()
    )
  2. ローカルマシンで gcloud CLI を使用して、各ワーカーを L4 GPU で高速化した 5 ワーカーで、Managed Service for Apache Spark サーバーレス バッチジョブを送信します。

    gcloud dataproc batches submit pyspark test-py-spark-gpu.py \
        --project=PROJECT_ID \
        --region=REGION \
        --deps-bucket=BUCKET_NAME \
        --version=1.1 \
        --properties=spark.dataproc.executor.compute.tier=premium,spark.dataproc.executor.disk.tier=premium,spark.dataproc.executor.resource.accelerator.type=l4,spark.executor.instances=5,spark.dataproc.driverEnv.LANG=C.UTF-8,spark.executorEnv.LANG=C.UTF-8,spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager
    

注:

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID。
  • REGION: ワークロードを実行できる利用可能な Compute Engine リージョン
  • BUCKET_NAME: Cloud Storage バケットの名前。Spark は、バッチ ワークロードを実行する前に、ワークロードの依存関係をこのバケットの /dependencies フォルダにアップロードします。
  • --version: すべてのサポートされている Managed Service for Apache Spark ランタイムで、GPU 高速化ワークロードの各ノードに RAPIDS ライブラリが追加されます。ランタイム バージョン 1.1 のみが、GPU 高速化ワークロードの各ノードに XGBoost ライブラリを追加します。
  • --propertiesSpark リソース割り当てプロパティを参照):

    • spark.dataproc.driverEnv.LANG=C.UTF-8spark.executorEnv.LANG=C.UTF-8 (2.2 より前のランタイム バージョンでは必須): これらのプロパティは、デフォルトの文字セットを C.UTF-8 に設定します。
    • spark.dataproc.executor.compute.tier=premium(必須): GPU アクセラレータ ワークロードは、プレミアム データ コンピューティング ユニット(DCU)を使用して課金されます。Managed Service for Apache Spark のアクセラレータの料金をご覧ください。

    • spark.dataproc.executor.disk.tier=premium(必須): A100-40、A100-80、または L4 アクセラレータを使用するノードは、プレミアム ディスク ティアを使用する必要があります。

    • spark.dataproc.executor.resource.accelerator.type=l4(必須): GPU タイプは 1 つだけ指定する必要があります。このジョブの例では、L4 GPU を選択しています。次のアクセラレータ タイプでは、次の引数名を指定できます。

      GPU のタイプ 引数名
      A100 40GB a100-40
      A100 80GB a100-80

    • spark.executor.instances=5(必須): 2 以上にする必要があります。この例では 5 に設定します。

    • spark.executor.cores(省略可): このプロパティを設定して、コア vCPU の数を指定できます。L4 GPU の有効な値は、デフォルトの 4、または 81216244896 です。A100 GPU の有効なデフォルト値は 12 のみです。L4 GPU と 244896 コアを含む構成では、各エグゼキュータに 248 GPU がアタッチされています。他のすべての構成には 1 GPU が接続されています。

    • spark.dataproc.executor.disk.size(必須): L4 GPU のディスクサイズは 375 GB に固定されています。ただし、244896 コアを含む構成では、それぞれ 7501,5003,000 GB になります。L4 高速化ワークロードの送信時に、このプロパティを別の値に設定すると、エラーが発生します。A100 40 または A100 80 GPU を選択した場合、有効なサイズは 375g、750g、1500g、3000g、6000g、9000g です。

    • spark.executor.memory(省略可)と spark.executor.memoryOverhead(制限あり): メモリは設定できますが、memoryOverhead は設定できません。設定されたプロパティで使用されていない使用可能なメモリの量は、設定されていないプロパティに適用されます。spark.executor.memoryOverhead は、PySpark バッチ ワークロードの使用可能なメモリの 40% に設定され、他のワークロードの場合は 10% に設定されます(Spark のリソース割り当てプロパティをご覧ください)。

      次の表に、A100 と L4 のさまざまな GPU 構成で設定できる最大メモリ容量を示します。どちらのプロパティの最小値も 1024 MB です。

      A100 (40 GB) A100 (80 GB) L4(4 コア) L4(8 コア) L4(12 コア) L4(16 コア) L4(24 コア) L4(48 コア) L4(96 コア)
      最大合計メモリ(MB) 78040 165080 13384 26768 40152 53536 113072 160608 321216
    • Spark RAPIDS プロパティ(省略可): デフォルトでは、Managed Service for Apache Spark は次の Spark RAPIDS プロパティ値を設定します。

      • spark.plugins=com.nvidia.spark.SQLPlugin
      • spark.executor.resource.gpu.amount=1
      • spark.task.resource.gpu.amount=1/$spark_executor_cores
      • spark.shuffle.manager=''。デフォルトでは、このプロパティは設定されていません。パフォーマンス向上のために GPU を使用する場合には、RAPIDS シャッフル マネージャーを有効にすることが NVIDIA により推奨されています。これを行うには、ワークロードを送信するときに spark.shuffle.manager=com.nvidia.spark.rapids.RapidsShuffleManager を設定します。
      • spark.rapids.sql.concurrentGpuTasks= 最小値(gpuMemoryinMB / 8、4)
      • spark.rapids.shuffle.multiThreaded.writer.threads= min (VM の CPU コア数 / VM あたりの GPU 数, 32)
      • spark.rapids.shuffle.multiThreaded.reader.threads= min (VM の CPU コア数 / VM あたりの GPU 数, 32)

      Spark RAPIDS プロパティを設定するには、Apache Spark 構成用の RAPIDS アクセラレータをご覧ください。また、Spark 詳細プロパティを設定するには、Apache Spark 詳細構成用の RAPIDS アクセラレータをご覧ください。