Spark バッチ ワークロードの自動チューニング

このドキュメントでは、Apache Spark 向け Serverless のバッチ ワークロードの自動チューニングについて説明します。Spark ワークロードのパフォーマンスと復元力を最適化することは、Spark 構成オプションの数と、これらのオプションがワークロードに与える影響を評価することの難しさから、困難な場合があります。Apache Spark 向けサーバーレスの自動チューニングは、Spark の最適化のベスト プラクティスとワークロード実行(コホート)の分析に基づいて、繰り返される Spark ワークロードに Spark 構成設定を自動的に適用することで、手動ワークロード構成に代わる手段を提供します。

Serverless for Apache Spark の自動チューニングに登録する

このページで説明する Serverless for Apache Spark 自動チューニングプレビュー リリースへのアクセスに登録するには、Dataproc プレビュー アクセス リクエストの登録フォームに記入して送信してください。フォームが承認されると、フォームにリストされているプロジェクトはプレビュー機能にアクセスできるようになります。

利点

Apache Spark 向け Serverless の自動チューニングには、次のようなメリットがあります。

  • 自動最適化: 効率の悪い Apache Spark 向け Serverless バッチと Spark 構成を自動的に調整し、ジョブの実行時間を短縮できます。
  • 履歴学習: 繰り返し実行から学習し、ワークロードに合わせて調整された推奨事項を適用します。

自動チューニング コホート

自動チューニングは、バッチ ワークロードの定期的な実行(コホート)に適用されます。

バッチ ワークロードを送信するときに指定するコホート名は、繰り返しワークロードの連続実行の 1 つとして識別されます。

自動チューニングは、次のようにバッチ ワークロード コホートに適用されます。

  • 自動チューニングは、ワークロードの 2 回目以降のコホートに対して計算され、適用されます。Apache Spark 用サーバーレスの自動チューニングでは、最適化にワークロード履歴が使用されるため、繰り返されるワークロードの初回実行には自動チューニングは適用されません。

  • 自動チューニングは、実行中のワークロードに遡って適用されることはありません。新しく送信されたワークロードにのみ適用されます。

  • 自動チューニングは、コホート統計情報を分析することで、時間の経過とともに学習して改善します。システムが十分なデータを収集できるように、自動チューニングを少なくとも 5 回実行することをおすすめします。

コホート名: 繰り返しワークロードのタイプを識別するのに役立つコホート名を使用することをおすすめします。たとえば、毎日売上高集計タスクを実行するスケジュール設定されたワークロードのコホート名として daily_sales_aggregation を使用できます。

自動チューニングのシナリオ

該当する場合、自動チューニングは次の scenarios または目標を自動的に選択して実行し、バッチ ワークロードを最適化します。

  • スケーリング: Spark 自動スケーリング構成の設定。
  • 結合の最適化: SQL ブロードキャスト結合のパフォーマンスを最適化する Spark 構成設定。

Serverless for Apache Spark の自動チューニングを使用する

Google Cloud コンソール、Google Cloud CLI、Dataproc API、または Cloud クライアント ライブラリを使用して、バッチ ワークロードで Apache Spark 用サーバーレス自動調整を有効にできます。

コンソール

定期的なバッチ ワークロードを送信するたびに Apache Spark 向けサーバーレス自動チューニングを有効にするには、次の操作を行います。

  1. Google Cloud コンソールで、Dataproc の [バッチ] ページに移動します。

    Dataproc バッチに移動

  2. バッチ ワークロードを作成するには、[作成] をクリックします。

  3. [自動チューニング] セクションで、次の操作を行います。

    • [有効にする] ボタンを切り替えて、Spark ワークロードの自動チューニングを有効にします。

    • コホート: コホート名を入力します。バッチが一連の反復的なワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。

  4. 必要に応じて [バッチを作成] ページの他のセクションに入力し、[送信] をクリックします。これらのフィールドの詳細については、バッチ ワークロードを送信するをご覧ください。

gcloud

定期的なバッチ ワークロードが送信されるたびに Apache Spark 用 Serverless の自動チューニングを有効にするには、次の gcloud CLI の gcloud dataproc batches submit コマンドをターミナル ウィンドウまたは Cloud Shell でローカルに実行します。

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=auto  \
    other arguments ...

次のように置き換えます。

  • COMMAND: Spark ワークロード タイプ(SparkPySparkSpark-SqlSpark-R など)。
  • REGION: バッチ ワークロードが実行されるリージョン
  • COHORT: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。
  • --autotuning-scenarios=auto: 自動チューニングを有効にします。

API

定期的なバッチ ワークロードを送信するたびに Apache Spark 向け Serverless 自動チューニングを有効にするには、次のフィールドを含む batches.create リクエストを送信します。

  • RuntimeConfig.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。
  • AutotuningConfig.scenarios: Spark バッチ ワークロードで自動チューニングを有効にするには、AUTO を指定します。

例:

...
runtimeConfig:
  cohort: COHORT_NAME
  autotuningConfig:
    scenarios:
    - AUTO
...

Java

このサンプルを試す前に、クライアント ライブラリを使用した Apache Spark 用 Serverless クイックスタートにある Java の設定手順を行ってください。 詳細については、Serverless for Apache Spark Java API のリファレンス ドキュメントをご覧ください。

Serverless for Apache Spark への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

定期的なバッチ ワークロードが送信されるたびに Apache Spark 向け Serverless の自動チューニングを有効にするには、次のフィールドを含む CreateBatchRequest を使用して BatchControllerClient.createBatch を呼び出します。

  • Batch.RuntimeConfig.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。
  • Batch.RuntimeConfig.AutotuningConfig.scenarios: Spark バッチ ワークロードで自動チューニングを有効にするには、AUTO を指定します。

例:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.AUTO))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

API を使用するには、google-cloud-dataproc クライアント ライブラリのバージョン 4.43.0 以降を使用する必要があります。次のいずれかの構成を使用して、ライブラリをプロジェクトに追加できます。

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

このサンプルを試す前に、クライアント ライブラリを使用した Apache Spark 用 Serverless クイックスタートにある Python の設定手順を行ってください。 詳細については、Serverless for Apache Spark Python API のリファレンス ドキュメントをご覧ください。

Serverless for Apache Spark への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証の設定をご覧ください。

定期的なバッチ ワークロードが送信されるたびに Apache Spark 向け Serverless の自動チューニングを有効にするには、次のフィールドを含む Batch を使用して BatchControllerClient.create_batch を呼び出します。

  • batch.runtime_config.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。
  • batch.runtime_config.autotuning_config.scenarios: Spark バッチ ワークロードで自動チューニングを有効にするには、AUTO を指定します。

例:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.AUTO
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

API を使用するには、google-cloud-dataproc クライアント ライブラリ バージョン 5.10.1 以降を使用する必要があります。プロジェクトに追加するには、次の要件を使用します。

google-cloud-dataproc>=5.10.1

Airflow

自動チューニングされたバッチ コホートを個別に手動で送信する代わりに、Airflow を使用して定期的なバッチ ワークロードの送信をスケジュールできます。これを行うには、次のフィールドを含む Batch を使用して BatchControllerClient.create_batch を呼び出します。

  • batch.runtime_config.cohort: コホート名。バッチが一連の繰り返しワークロードの 1 つとして識別されます。自動チューニングは、このコホート名で送信された 2 回目以降のワークロードに適用されます。たとえば、毎日の販売集計タスクを実行するスケジュール設定されたバッチ ワークロードのコホート名として daily_sales_aggregation を指定します。
  • batch.runtime_config.autotuning_config.scenarios: Spark バッチ ワークロードで自動チューニングを有効にするには、AUTO を指定します。

例:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.AUTO,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

API を使用するには、google-cloud-dataproc クライアント ライブラリ バージョン 5.10.1 以降を使用する必要があります。次の Airflow 環境要件を使用できます。

google-cloud-dataproc>=5.10.1

Cloud Composer でパッケージを更新するには、Cloud Composer の Python 依存関係をインストールする をご覧ください。

自動チューニングの変更を表示する

バッチ ワークロードに対する Apache Spark 向け Serverless の自動チューニングの変更を表示するには、gcloud dataproc batches describe コマンドを実行します。

例: gcloud dataproc batches describe 出力は次のようになります。

...
runtimeInfo:
  propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties:
      spark.dataproc.sql.broadcastJoin.hints:
        annotation: Converted 1 Sort-Merge Joins to Broadcast Hash Join
        value: v2;Inner,<hint>
      spark.dynamicAllocation.initialExecutors:
        annotation: Adjusted Initial executors based on stages submitted in first
          2 minutes to 9
        overriddenValue: '2'
        value: '9'
      spark.dynamicAllocation.maxExecutors:
        annotation: Tuned Max executors to 11
        overriddenValue: '5'
        value: '11'
      spark.dynamicAllocation.minExecutors:
        annotation: Changed Min executors to 9
        overriddenValue: '2'
        value: '9'
...

実行中のワークロード、完了したワークロード、または失敗したワークロードに適用された最新の自動チューニングの変更は、 Google Cloud コンソールの [バッチの詳細] ページの [概要] タブで確認できます。

自動チューニングの概要パネル。

料金

Apache Spark 用サーバーレスの自動チューニングは、限定公開プレビュー期間中は追加料金なしで提供されます。標準の Apache Spark 用 Serverless の料金が適用されます。