以降のセクションでは、Managed Service for Apache Spark Spark アプリケーションを微調整するためのヒントを紹介します。
エフェメラル クラスタを使用する
Managed Service for Apache Spark の「エフェメラル」クラスタモデルを使用する場合、ジョブごとに専用クラスタを作成し、ジョブが終了したらクラスタを削除します。 エフェメラル モデルを使用すると、ストレージとコンピューティングを別々に処理し、ジョブの入力データと出力データを Cloud Storage または BigQuery に保存し、コンピューティングと一時データ ストレージのみにクラスタを使用できます。
永続クラスタの注意点
1 つのジョブのエフェメラル クラスタを使用すると、共有クラスタおよび長時間実行される「永続」クラスタの使用に伴う次のような注意点と潜在的な問題を回避できます。
- 単一障害点: 共有クラスタのエラー状態が原因で、すべてのジョブが失敗し、データ パイプライン全体がブロックされることがあります。エラーの調査と復旧には数時間かかる場合があります。エフェメラル クラスタはクラスタ内の一時的な状態のみを保持するため、エラーが発生した場合は、すばやく削除して再作成できます。
- HDFS、MySQL、またはローカル ファイル システムでクラスタの状態の維持および移行が困難
- SLO に悪影響を及ぼすジョブ間のリソース競合
- メモリ負荷によりサービス デーモンが応答しない
- ディスク容量を超える可能性のあるログと一時ファイルの蓄積
- クラスタゾーンの在庫切れによるスケールアップの失敗
- 古いクラスタ イメージ バージョンがサポートされていない。
エフェメラル クラスタのメリット
エフェメラル クラスタには次のような利点があります。
- 異なる 異なる Managed Service for Apache Spark VM サービス アカウントで、ジョブごとに異なる IAM 権限を構成します。
- ジョブごとにクラスタのハードウェアとソフトウェアの構成を最適化し、必要に応じてクラスタ構成を変更します。
- 新しいクラスタでイメージ バージョンをアップグレードして、最新のセキュリティ パッチ、バグの修正、最適化を取得します。
- 分離された単一ジョブクラスタで問題のトラブルシューティングをより迅速に行います。
- 共有クラスタのジョブ間のアイドル状態時間ではなく、エフェメラル クラスタの実行時間のみを支払うことで、費用を節約できます。
Spark SQL の使用
Spark SQL DataFrame API は、RDD API を大幅に最適化したものです。RDD を使用するコードを操作する場合は、RDD をコードに渡す前に DataFrame としてデータを読み取ることを検討してください。Java や Scala のコードでは、RDD とデータフレームのスーパーセットとして Spark SQL Dataset API の使用を検討してください。
Apache Spark 3 の使用
Managed Service for Apache Spark 2.0 で Spark 3 をインストールすると、次の機能とパフォーマンスが 改善されます。
- GPU のサポート
- バイナリ ファイルの読み取り機能
- パフォーマンスの改善
- 動的パーティション プルーニング
- 適応型クエリ実行。Spark ジョブをリアルタイムで最適化します。
ダイナミック アロケーションの使用
Apache Spark には、クラスタ内のワーカーで Spark エグゼキュータの数をスケーリングする動的割り当て 機能が含まれています。この機能を使用すると、クラスタがスケールアップされても、ジョブが Managed Service for Apache Spark クラスタ全体を使用できます。この機能は、デフォルトで Managed Service for Apache Spark
で有効になっています(spark.dynamicAllocation.enabled は true に設定されています)。詳細は、
Spark ダイナミック アロケーション
をご覧ください。
Managed Service for Apache Spark の自動スケーリングを使用する
Managed Service for Apache Spark 自動スケーリング では、クラスタの Managed Service for Apache Spark ワーカーの数が動的に増減されるため、 Spark ジョブで迅速に完了するために必要なリソースを確保できます。
セカンダリ ワーカーのみをスケーリングするように自動スケーリング ポリシーを構成することがベスト プラクティス となります。
Managed Service for Apache Spark の高度な柔軟性モードを使用する
プリエンプティブル VM または自動スケーリング ポリシーを持つクラスタは、ワーカーをプリエンプトまたは削除する場合、削減指定子へのシャッフル データの提供が完了する前に、FetchFailed の例外を受け取ることがあります。この例外が発生すると、タスクの再試行が発生したり、ジョブの完了時間が長くなったりする可能性があります。
推奨事項: Managed Service for Apache Spark 高度な柔軟性モードを使用してください。 このモードを使用すると、セカンダリ ワーカーに中間シャッフル データが保存されないため、 セカンダリ ワーカーを安全にプリエンプトまたはスケールダウンできます。
パーティショニングとシャッフルの構成
Spark は、クラスタの一時パーティションにデータを保存します。アプリケーションが DataFrame をグループ化または結合すると、グループと低レベル構成に従って新しいパーティション内にデータがシャッフルされます。
データ パーティショニングは、アプリケーションのパフォーマンスに大きな影響を与えます。パーティションが少なすぎると、ジョブの並列処理とクラスタ リソースの使用率が制限されます。パーティションが多すぎると、追加のパーティション処理とシャッフルのためにジョブの速度が低下します。
パーティションの構成
パーティションの数とサイズは、次のプロパティで決まります。
spark.sql.files.maxPartitionBytes: Cloud Storage からデータを読み取るときのパーティションの最大サイズ。デフォルトは 128 MB です。これは、100 TB 未満の処理を行うほとんどのアプリケーションには十分な大きさです。spark.sql.shuffle.partitions: シャッフル実行後のパーティションの数。デフォルトは、2.2以降のイメージ バージョン クラスタの場合は1000です。推奨: この値を、クラスタ内の vCPU 数の 3 倍に設定します。spark.default.parallelism: シャッフルが必要な RDD 変換(join、reduceByKey、parallelizeなど)を実行した後に返されるパーティションの数。デフォルトは、クラスタ内の vCPU の合計数です。Spark ジョブで RDD を使用する場合、この数は vCPU の 3 倍に設定します。
ファイル数を制限する
Spark で多数の小さなファイルを読み取ると、パフォーマンスが低下します。256 MB~512 MB の範囲のファイルサイズなど、より大きなファイルサイズでデータを保存します。 同様に、出力ファイルの数を制限してください(シャッフルを強制するには、不要なシャッフルの回避をご覧ください)。
アダプティブ クエリ実行の構成(Spark 3)
適応型クエリ実行 (Managed Service for Apache Spark イメージ バージョン 2.0 でデフォルトで有効になっています) を行うと、次のような Spark ジョブのパフォーマンスが向上します。
ほとんどのユースケースにはデフォルトの構成設定で問題ありませんが、spark.sql.adaptive.advisoryPartitionSizeInBytes を spark.sql.files.maxPartitionBytes(デフォルトは 128 MB)に設定すると便利です。
不要なシャッフルの回避
Spark を使用すると、シャッフルを手動でトリガーし、repartition 関数を使用してデータを再調整できます。シャッフルは負荷が大きいため、データの再シャッフルは慎重に使用される必要があります。Spark でデータを自動的にパーティショニングできるようにするには、パーティション構成を適切に設定するだけで十分です。
例外: 列パーティション分割データを Cloud Storage に書き込むときに、特定の列を再パーティショニングすることで、多数の小さなファイルの書き込みが回避され、書き込み時間を短縮できます。
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Parquet または Avro にデータを保存する
Spark SQL は、デフォルトで Snappy 圧縮 Parquet ファイルのデータの読み取りと書き込みを行います。Parquet は効率的なカラム型ファイル形式であり、Spark でアプリケーションの実行に必要なデータのみを読み取ることができます。これは、大規模なデータセットを扱う場合に非常に重要になります。Apache ORC など、他のカラム型形式も効果的です。
カラム型ではないデータの場合、 Apache Avro が 効率的なバイナリ行ファイル形式を提供します。通常は Parquet より時間がかかりますが、Avro のパフォーマンスはテキストベースの形式(CSV や JSON など)より優れています。
ディスクサイズの最適化
永続ディスクのスループットはディスクサイズに応じてスケーリングされます。ジョブはメタデータを書き込み、データをディスクにシャッフルするため、Spark ジョブのパフォーマンスに影響を与える可能性があります。標準の永続ディスクを使用する場合、ディスクサイズはワーカーあたり少なくとも 1 テラバイトである必要があります(永続ディスクサイズによるパフォーマンスを参照)。
Google Cloud コンソールでワーカーのディスク スループットをモニタリングするには、次のようにします。
- [クラスタ] ページでクラスタ名をクリックします。
- [VM インスタンス] タブをクリックします。
- ワーカー名をクリックします。
- [モニタリング] タブをクリックして、[ディスク スループット] までスクロールし、ワーカー スループットを表示します。
ディスクに関する考慮事項
永続ストレージのメリットがないエフェメラル Managed Service for Apache Spark クラスタでは、ローカル SSD を使用できます。ローカル SSD は、クラスタに物理的に接続され、永続ディスクよりもスループットが高くなります (パフォーマンス表をご覧ください)。 ローカル SSD は、375 GB の固定サイズで利用できますが、複数の SSD を追加してパフォーマンスを改善することもできます。
ローカル SSD は、クラスタのシャットダウン後にデータを保持しません。永続 ストレージが必要な場合は、SSD 永続ディスクを使用して、 標準永続ディスクよりもサイズに対するスループット を高めることができます。パーティション サイズが 8 KB 未満の場合は、SSD 永続ディスクも適しています(ただし、小さいパーティションは避けてください)。
クラスタに GPU を接続する
Spark 3 は GPU をサポートしています。RAPIDS 初期化アクションで GPU を使用して、RAPIDS SQL Accelerator を使用して Spark ジョブを高速化します。GPU を使用するクラスタを構成するには、GPU ドライバの初期化アクションを使用します。
一般的なジョブのエラーと解決策
次のセクションでは、ジョブの失敗の一般的な原因と解決策について説明します。
メモリ不足
例:
- 「Lost executor」(エグゼキュータがなくなりました)
- 「java.lang.OutOfMemoryError: GC overhead limit exceeded」(GC のオーバーヘッド上限を超えました)
- 「Container killed by YARN for exceeding memory limits」(コンテナの上限超過のため YARN によって強制終了されました)
考えられる解決策:
- PySpark を使用している場合は、
spark.executor.memoryOverheadを上げ、spark.executor.memoryを下げます。 - ハイメモリ マシンタイプを使用します。
- 小さいパーティションを使用します。
シャッフル フェッチのエラー
例:
- 「FetchFailedException」(Spark エラー)
- 「Failed to connect to...」(Spark エラー)
- 「Failed to fetch」(取得できませんでした)(MapReduce エラー)
通常、まだ処理するシャッフル データがあるワーカーの早期削除が原因です。
考えられる原因と修正点:
- プリエンプティブル ワーカー VM が再要求された、または非プリエンプティブル ワーカー VM がオートスケーラーによって削除されました。解決策: 高度な柔軟性モードを使用して、セカンダリ ワーカーを安全にプリエンプティブルまたはスケーラブルにします。
- OutOfMemory エラーによりエグゼキュータまたはマッパーがクラッシュしました。解決策: エグゼキュータまたはマッパーのメモリを増やします。
- Spark シャッフル サービスが過負荷状態になっている可能性があります。解決策: ジョブ パーティションの数を減らします。
YARN ノードが UNHEALTHY
YARN ログからの例:
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
多くの場合、データをシャッフルするためのディスク容量の不足に関連しています。ログファイルを表示して診断します。
- Google Cloud コンソールでプロジェクトの [クラスタ] ページを開き、クラスタの名前をクリックします。
- [ログを表示] をクリックします。
hadoop-yarn-nodemanagerでログをフィルタします。- 「UNHEALTHY」で検索します。
考えられる解決策:
- ユーザー キャッシュは、
yarn-site.xml fileのyarn.nodemanager.local-dirsプロパティで指定したディレクトリに保存されます。このファイルは/etc/hadoop/conf/yarn-site.xmlにあります。/hadoop/yarn/nm-local-dirパスの空き容量を確認し、/hadoop/yarn/nm-local-dir/usercacheユーザー キャッシュ フォルダを削除して空き容量を確保します。 - ログに「UNHEALTHY」というステータスが報告される場合は、より大きなディスク空き容量を使用してクラスタを再作成し、スループットの上限を引き上げます。
ユーザー キャッシュがディスク容量を占有している
Managed Service for Apache Spark では、ユーザー キャッシュは
yarn.nodemanager.local-dirs プロパティで指定したディレクトリに保存されます。yarn-site.xml
(/etc/hadoop/conf/yarn-site.xml)。ほとんどの場合、場所は
/hadoop/yarn/nm-local-dir です。
考えられる解決策:
- 手動クリーンアップ:
/hadoop/yarn/nm-local-dirパスの空き容量を確認し、/hadoop/yarn/nm-local-dir/usercacheユーザー キャッシュ フォルダを削除して空き容量を確保します。 長期的な解決策: 次の クラスタ プロパティを設定して、クリーンアップ プロセスを管理します。
yarn.nodemanager.localizer.cache.cleanup.interval-msyarn.nodemanager.localizer.cache.target-size-mb
クラスタを再作成しない場合は、すべてのノードの
yarn-site.xmlでこれらのプロパティを更新し、ワーカーノードで次のコマンドを実行して NodeManager を再起動します。sudo systemctl restart hadoop-yarn-nodemanager.serviceクラスタのサイズ変更: ログに「UNHEALTHY」というステータスが報告される場合は、より大きなディスク空き容量を使用してクラスタを再作成し、スループットの上限を引き上げます。
ドライバのメモリ不足によるジョブの失敗
クラスタモードでジョブを実行する際、ワーカーノードのメモリ容量が不足しているとジョブは失敗します。
ドライバログの例:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
考えられる解決策:
spark:spark.driver.memoryをyarn:yarn.scheduler.maximum-allocation-mbよりも小さく設定します。- マスターノードとワーカーノードに同じマシンタイプを使用します。
次のステップ
- Spark パフォーマンスの調整の詳細を確認する。