Managed Service for Apache Spark の自動スケーリング

このドキュメントでは、Managed Service for Apache Spark の自動スケーリングについて説明します。Spark ワークロードを送信すると、Managed Service for Apache Spark は、エグゼキュータの数などのワークロード リソースを動的にスケーリングして、ワークロードを効率的に実行できます。Managed Service for Apache Spark の自動スケーリングはデフォルトの動作であり、Spark 動的リソース割り当てを使用して、ワークロードをスケーリングするかどうか、またその方法とタイミングを決定します。

Managed Service for Apache Spark の自動スケーリング V2

Managed Service for Apache Spark 自動スケーリング バージョン 2(V2)では、デフォルト バージョン 1(V1)に機能と改善を追加して、Managed Service for Apache Spark ワークロードの管理、ワークロードのパフォーマンスの改善、費用の削減を行います。

  • 非同期ノードのスケールダウン: 自動スケーリング V2 は、V1 の同期スケールダウンを非同期スケールダウンに置き換えます。非同期ダウン スケーリングを使用すると、Managed Service for Apache Spark は、すべてのノードがシャッフル移行を完了するのを待たずに、ワークロード リソースをダウン スケールします。つまり、スケールダウンが遅いロングテール ノードがアップスケーリングをブロックすることはありません。
  • インテリジェントなスケールダウンノード選択: 自動スケーリング V2 は、V1 のランダムなノード選択を、最初にスケールダウンするのに最適なノードを特定するインテリジェントなアルゴリズムで置き換えます。このアルゴリズムでは、ノードのシャッフル データサイズやアイドル時間などの要素が考慮されます。
  • 構成可能な Spark の正常なデコミッションとシャッフル移行の動作: 自動スケーリング V2 では、標準の Spark プロパティを使用して、Spark の正常なデコミッションとシャッフル移行を構成できます。この機能は、カスタマイズされた Spark プロパティとの移行の互換性を維持するのに役立ちます。

Managed Service for Apache Spark の自動スケーリング機能

機能 Managed Service for Apache Spark Autoscaling V1 Managed Service for Apache Spark Autoscaling V2
ノードのダウン スケーリング 同期 非同期
ダウン スケーリングのノード選択 ランダム インテリジェント
Spark の正常なデコミッションとシャッフル移行 構成不可 構成可能

Spark の動的割り当てのプロパティ

次の表は、バッチ ワークロードを送信して自動スケーリングを制御する際に設定できる Spark の動的割り当てプロパティの一覧です(Spark プロパティの設定方法をご覧ください)。

プロパティ 説明 デフォルト
spark.dataproc.scaling.version Managed Service for Apache Spark の自動スケーリング バージョン。バージョン 1 または 2 を指定します(Managed Service for Apache Spark 自動スケーリング V2 をご覧ください)。 1
spark.dynamicAllocation.enabled ワークロードに基づいてエグゼキュータの数をスケールアップまたはスケールダウンする動的リソース割り当てを使用するかどうか。 値を false に設定すると、ワークロードの自動スケーリングが無効になります。デフォルト: true true
spark.dynamicAllocation.initialExecutors ワークロードに割り当てられるエグゼキュータの初期数。ワークロードが開始されると、自動スケーリングによってアクティブなエグゼキュータの数が変化することがあります。最小値は 2 で、最大値は 2000 です。 2
spark.dynamicAllocation.minExecutors ワークロードをスケールダウンするエグゼキュータの最小数。最小値は 2 です。 2
spark.dynamicAllocation.maxExecutors ワークロードをスケールアップするエグゼキュータの最大数。最大値は 2000 です。 1000
spark.dynamicAllocation.executorAllocationRatio Spark ワークロードのスケールアップをカスタマイズします。01 の値を受け付けます。値 1.0 は、スケールアップ機能を最大化し、並列処理を最大化するのに役立ちます。値 0.5 では、スケールアップ機能と並列処理が最大値の半分に設定されます。 0.3
spark.dynamicAllocation.diagnosis.enabled true の場合、実行中の実行プログラムが spark.dynamicAllocation.diagnosis.interval で指定された期間の最大必要実行プログラム数を超えると、診断情報がログに記録されます。診断には、アイドル状態のエグゼキュータ数とアイドル時間のパーセンタイル、アクティブなタスクの分布、シャッフル データサイズ、キャッシュに保存された RDD サイズを含むエグゼキュータの概要が含まれます。spark.dynamicAllocation.diagnosis.logLevel を使用して、出力ログレベルを制御します。 false
spark.dynamicAllocation.profile performance または cost に設定して、パフォーマンスまたは費用対効果に最適化された事前定義済みの構成セットを適用します。ユーザー定義のプロパティは、プロファイルのデフォルトをオーバーライドします。詳細については、Spark の動的割り当てプロファイルをご覧ください。 none
spark.dynamicAllocation.shuffleTracking.dynamicTimeout.enabled true の場合、シャッフル データを保持するエグゼキュータの動的タイムアウト計算を有効にします。静的な spark.dynamicAllocation.shuffleTracking.timeout を使用する代わりに、タイムアウトはエグゼキュータに保存されているシャッフル データの量に基づいて計算されます。これにより、シャッフルが小さいエグゼキュータはより早くリリースされ、シャッフルが大きいエグゼキュータはより長く存続します。 false
spark.reducer.fetchMigratedShuffle.enabled true に設定すると、Spark 動的割り当てによって廃止されたエグゼキュータからのフェッチが失敗した後、Spark ドライバからシャッフル出力のロケーションをフェッチできるようになります。これにより、廃止されたエグゼキュータからライブ エグゼキュータへのシャッフル ブロックの移行によって発生する ExecutorDeadException エラーが減り、FetchFailedException エラーによって発生するステージの再試行が少なくなります(ExecutorDeadException による FetchFailedException をご覧ください)。このプロパティは、Managed Service for Apache Spark の Spark ランタイム バージョン 1.1.12 以降と 2.0.20 以降で使用できます。 false
spark.scheduler.excludeShuffleSkewExecutors true の場合、シャッフル データ量が多い、または完了したマップタスクの数が多いエグゼキュータであるシャッフル スキュー エグゼキュータでタスクをスケジュールしないようにします。これにより、シャッフル スキューが軽減され、パフォーマンスが向上します。 false

Spark 動的割り当てプロファイル

spark.dynamicAllocation.profile プロパティを performance または cost に設定すると、パフォーマンスまたは費用対効果に最適化された Spark 構成の事前定義されたセットが適用されます。spark.dynamicAllocation.profile プロパティの設定に加えて Spark プロパティを設定すると、これらのプロパティのプロファイルのデフォルトがオーバーライドされます。

performance: このプロファイルは、次のデフォルト設定を適用して、実行時間を最小限に抑えるように最適化します。

  • spark.scheduler.excludeShuffleSkewExecutors: true
  • spark.dynamicAllocation.executorIdleTimeout: 300s
  • spark.dynamicAllocation.initialExecutors: 10

cost: このプロファイルは、次のデフォルト設定を適用して、リソース消費の削減を最適化します。

  • spark.dynamicAllocation.executorIdleTimeout: 120s
  • spark.dynamicAllocation.cachedExecutorIdleTimeout: 120s
  • spark.dynamicAllocation.shuffleTracking.dynamicTimeout.enabled: true
  • spark.dynamicAllocation.diagnosis.enabled: true

Spark 動的割り当ての指標

Spark バッチ ワークロードは、Spark の動的リソース割り当てに関連する次の指標を生成します(Spark 指標の詳細については、モニタリングと計測をご覧ください)。

指標 説明
maximum-needed 実行中のタスクと保留中のタスクをすべて達成するために現在の負荷で必要なエグゼキュータの最大数。
running タスクを実行している稼働中のエグゼキュータ数。

Spark 動的割り当ての問題と解決策

  • ExecutorDeadException による FetchFailedException

    原因: Spark 動的割り当てがエグゼキュータをスケールダウンすると、シャッフル ファイルがライブ エグゼキュータに移行します。ただし、エグゼキュータの Spark レデューサ タスクは、レデューサ タスクの開始時に Spark ドライバによって設定されたロケーションからシャッフル出力をフェッチするため、シャッフル ファイルが移行されても、レデューサは引き続き廃止されたエグゼキュータからのシャッフル出力のフェッチを試行できます。これにより、ExecutorDeadExceptionFetchFailedException エラーが発生します。

    解決策: Managed Service for Apache Spark バッチ ワークロードを実行するときに、spark.reducer.fetchMigratedShuffle.enabledtrue に設定して、シャッフル ロケーションの再取得を有効にします(Spark バッチ ワークロードのプロパティの設定をご覧ください)。このプロパティを有効にすると、廃止されたエグゼキュータからのフェッチに失敗した後、レデューサ タスクはドライバからシャッフル出力のロケーションを再取得します。