Ajuste automático de cargas de trabajo por lotes de Spark

En este documento se proporciona información sobre el ajuste automático de las cargas de trabajo por lotes de Serverless para Apache Spark. Optimizar una carga de trabajo de Spark para mejorar el rendimiento y la resiliencia puede ser complicado debido al número de opciones de configuración de Spark y a la dificultad de evaluar cómo influyen esas opciones en una carga de trabajo. El ajuste automático de Serverless para Apache Spark ofrece una alternativa a la configuración manual de la carga de trabajo. Para ello, aplica automáticamente los ajustes de configuración de Spark a una carga de trabajo de Spark recurrente en función de las prácticas recomendadas de optimización de Spark y de un análisis de las ejecuciones de la carga de trabajo (denominadas "cohortes").

Registrarse para usar el ajuste automático de Serverless para Apache Spark

Para registrarte y acceder a la versión preliminar de la función de ajuste automático de Serverless para Apache Spark, que se describe en esta página, rellena y envía el formulario de registro de solicitud de acceso a la versión preliminar de Dataproc. Una vez que se apruebe el formulario, los proyectos que se incluyan en él tendrán acceso a las funciones de vista previa.

Ventajas

La optimización automática de Serverless para Apache Spark puede ofrecer las siguientes ventajas:

  • Optimización automática: ajusta automáticamente las configuraciones de Spark y de Serverless para Apache Spark por lotes que sean ineficientes, lo que puede acelerar los tiempos de ejecución de las tareas.
  • Aprendizaje histórico: aprende de las ejecuciones recurrentes para aplicar recomendaciones adaptadas a tu carga de trabajo.

Cohortes de ajuste automático

El ajuste automático se aplica a las ejecuciones periódicas (cohortes) de una carga de trabajo por lotes.

El nombre de cohorte que especifiques al enviar una carga de trabajo por lotes la identifica como una de las ejecuciones sucesivas de la carga de trabajo periódica.

El ajuste automático se aplica a las cohortes de cargas de trabajo por lotes de la siguiente manera:

  • El ajuste automático se calcula y se aplica a la segunda y a las siguientes cohortes de una carga de trabajo. El ajuste automático no se aplica en la primera ejecución de una carga de trabajo periódica, ya que usa el historial de la carga de trabajo para optimizarla.

  • El ajuste automático no se aplica de forma retroactiva a las cargas de trabajo en ejecución, sino solo a las cargas de trabajo enviadas recientemente.

  • La optimización automática aprende y mejora con el tiempo analizando las estadísticas de la cohorte. Para que el sistema pueda recoger suficientes datos, te recomendamos que mantengas la optimización automática habilitada durante al menos cinco ejecuciones.

Nombres de cohorte: te recomendamos que uses nombres de cohorte que te ayuden a identificar el tipo de carga de trabajo recurrente. Por ejemplo, puedes usar daily_sales_aggregation como nombre de cohorte para una carga de trabajo programada que ejecute una tarea de agregación de ventas diaria.

Situaciones de ajuste automático

Cuando procede, la optimización automática selecciona y ejecuta automáticamente los siguientes scenarios u objetivos para optimizar una carga de trabajo por lotes:

  • Escalado: ajustes de configuración del autoescalado de Spark.
  • Optimización de las uniones: ajustes de configuración de Spark para optimizar el rendimiento de las uniones de difusión de SQL.

Usar el ajuste automático de Serverless para Apache Spark

Puedes habilitar el ajuste automático de Serverless para Apache Spark en una carga de trabajo por lotes mediante la Google Cloud consola, la CLI de Google Cloud o la API Dataproc, o bien las bibliotecas de cliente de Cloud.

Consola

Para habilitar la optimización automática de Serverless para Apache Spark en cada envío de una carga de trabajo por lotes periódica, sigue estos pasos:

  1. En la Google Cloud consola, ve a la página Lotes de Dataproc.

    Ir a Lotes de Dataproc

  2. Para crear una carga de trabajo por lotes, haz clic en Crear.

  3. En la sección Ajuste automático:

    • Activa el botón Habilitar para habilitar el ajuste automático de la carga de trabajo de Spark.

    • Cohorte: indica el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo que se envían con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como nombre de la cohorte de una carga de trabajo por lotes programada que ejecuta una tarea de agregación de ventas diaria.

  4. Rellene otras secciones de la página Crear lote según sea necesario y, a continuación, haga clic en Enviar. Para obtener más información sobre estos campos, consulta Enviar una carga de trabajo por lotes.

gcloud

Para habilitar el ajuste automático de Serverless para Apache Spark en cada envío de una carga de trabajo por lotes periódica, ejecuta el siguiente comando de la CLI de gcloud gcloud dataproc batches submit de forma local en una ventana de terminal o en Cloud Shell.

gcloud dataproc batches submit COMMAND \
    --region=REGION \
    --cohort=COHORT \
    --autotuning-scenarios=auto  \
    other arguments ...

Haz los cambios siguientes:

  • COMMAND: el tipo de carga de trabajo de Spark, como Spark, PySpark, Spark-Sql o Spark-R.
  • REGION: la región en la que se ejecutará tu carga de trabajo por lotes.
  • COHORT: el nombre de la cohorte, que identifica el lote como uno de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo que se envían con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como nombre de la cohorte de una carga de trabajo por lotes programada que ejecuta una tarea de agregación de ventas diaria.
  • --autotuning-scenarios=auto: habilita la sintonización automática.

API

Para habilitar el ajuste automático de Serverless para Apache Spark en cada envío de una carga de trabajo por lotes periódica, envía una solicitud batches.create que incluya los siguientes campos:

  • RuntimeConfig.cohort: el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo enviadas con este nombre de cohorte. Por ejemplo, especifica daily_sales_aggregation como nombre de cohorte de una carga de trabajo por lotes programada que ejecute una tarea de agregación de ventas diaria.
  • AutotuningConfig.scenarios: especifica AUTO para habilitar la optimización automática en la carga de trabajo por lotes de Spark.

Ejemplo:

...
runtimeConfig:
  cohort: COHORT_NAME
  autotuningConfig:
    scenarios:
    - AUTO
...

Java

Antes de probar este ejemplo, sigue las Javainstrucciones de configuración de la guía de inicio rápido de Serverless para Apache Spark con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Java Serverless para Apache Spark.

Para autenticarte en Serverless para Apache Spark, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

Para habilitar el ajuste automático de Serverless para Apache Spark en cada envío de una carga de trabajo por lotes periódica, llama a BatchControllerClient.createBatch con un CreateBatchRequest que incluya los siguientes campos:

  • Batch.RuntimeConfig.cohort: el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo enviadas con este nombre de cohorte. Por ejemplo, puede especificar daily_sales_aggregation como nombre de cohorte de una carga de trabajo por lotes programada que ejecute una tarea de agregación de ventas diaria.
  • Batch.RuntimeConfig.AutotuningConfig.scenarios: especifica AUTO para habilitar el ajuste automático en la carga de trabajo por lotes de Spark.

Ejemplo:

...
Batch batch =
  Batch.newBuilder()
    .setRuntimeConfig(
      RuntimeConfig.newBuilder()
        .setCohort("daily_sales_aggregation")
        .setAutotuningConfig(
          AutotuningConfig.newBuilder()
            .addScenarios(Scenario.AUTO))
    ...
  .build();

batchControllerClient.createBatch(
    CreateBatchRequest.newBuilder()
        .setParent(parent)
        .setBatchId(batchId)
        .setBatch(batch)
        .build());
...

Para usar la API, debes usar la versión google-cloud-dataproc o una posterior de la biblioteca de cliente 4.43.0. Puedes usar una de las siguientes configuraciones para añadir la biblioteca a tu proyecto.

Maven

<dependencies>
 <dependency>
   <groupId>com.google.cloud</groupId>
   <artifactId>google-cloud-dataproc</artifactId>
   <version>4.43.0</version>
 </dependency>
</dependencies>

Gradle

implementation 'com.google.cloud:google-cloud-dataproc:4.43.0'

SBT

libraryDependencies += "com.google.cloud" % "google-cloud-dataproc" % "4.43.0"

Python

Antes de probar este ejemplo, sigue las Pythoninstrucciones de configuración de la guía de inicio rápido de Serverless para Apache Spark con bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Python Serverless para Apache Spark.

Para autenticarte en Serverless para Apache Spark, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación en un entorno de desarrollo local.

Para habilitar el ajuste automático de Serverless para Apache Spark en cada envío de una carga de trabajo por lotes periódica, llama a BatchControllerClient.create_batch con un Batch que incluya los siguientes campos:

  • batch.runtime_config.cohort: el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo enviadas con este nombre de cohorte. Por ejemplo, puedes especificar daily_sales_aggregation como nombre de cohorte de una carga de trabajo por lotes programada que ejecute una tarea de agregación de ventas diaria.
  • batch.runtime_config.autotuning_config.scenarios: especifica AUTO para habilitar la optimización automática en la carga de trabajo por lotes de Spark.

Ejemplo:

# Create a client
client = dataproc_v1.BatchControllerClient()

# Initialize request argument(s)
batch = dataproc_v1.Batch()
batch.pyspark_batch.main_python_file_uri = "gs://bucket/run_tpcds.py"
batch.runtime_config.cohort = "daily_sales_aggregation"
batch.runtime_config.autotuning_config.scenarios = [
    Scenario.AUTO
]

request = dataproc_v1.CreateBatchRequest(
    parent="parent_value",
    batch=batch,
)

# Make the request
operation = client.create_batch(request=request)

Para usar la API, debes usar la versión 5.10.1 o una posterior de la biblioteca de cliente google-cloud-dataproc. Para añadirlo a tu proyecto, puedes usar el siguiente requisito:

google-cloud-dataproc>=5.10.1

Airflow

En lugar de enviar manualmente cada cohorte de lotes optimizado automáticamente, puede usar Airflow para programar el envío de cada carga de trabajo de lotes periódica. Para ello, llama a BatchControllerClient.create_batch con un Batch que incluya los siguientes campos:

  • batch.runtime_config.cohort: el nombre de la cohorte, que identifica el lote como una de una serie de cargas de trabajo recurrentes. El ajuste automático se aplica a la segunda y las siguientes cargas de trabajo enviadas con este nombre de cohorte. Por ejemplo, puede especificar daily_sales_aggregation como nombre de cohorte de una carga de trabajo por lotes programada que ejecute una tarea de agregación de ventas diaria.
  • batch.runtime_config.autotuning_config.scenarios: especifica AUTO para habilitar el ajuste automático en la carga de trabajo por lotes de Spark.

Ejemplo:

create_batch = DataprocCreateBatchOperator(
    task_id="batch_create",
    batch={
        "pyspark_batch": {
            "main_python_file_uri": PYTHON_FILE_LOCATION,
        },
        "environment_config": {
            "peripherals_config": {
                "spark_history_server_config": {
                    "dataproc_cluster": PHS_CLUSTER_PATH,
                },
            },
        },
        "runtime_config": {
            "cohort": "daily_sales_aggregation",
            "autotuning_config": {
                "scenarios": [
                    Scenario.AUTO,
                ]
            }
        },
    },
    batch_id="BATCH_ID",
)

Para usar la API, debes usar la versión 5.10.1 o una posterior de la biblioteca de cliente google-cloud-dataproc. Puedes usar el siguiente requisito del entorno de Airflow:

google-cloud-dataproc>=5.10.1

Para actualizar el paquete en Cloud Composer, consulta Instalar dependencias de Python para Cloud Composer .

Ver los cambios de ajuste automático

Para ver los cambios de ajuste automático de Serverless para Apache Spark en una carga de trabajo por lotes, ejecuta el comando gcloud dataproc batches describe.

Ejemplo: el resultado de gcloud dataproc batches describe es similar al siguiente:

...
runtimeInfo:
  propertiesInfo:
    # Properties set by autotuning.
    autotuningProperties:
      spark.dataproc.sql.broadcastJoin.hints:
        annotation: Converted 1 Sort-Merge Joins to Broadcast Hash Join
        value: v2;Inner,<hint>
      spark.dynamicAllocation.initialExecutors:
        annotation: Adjusted Initial executors based on stages submitted in first
          2 minutes to 9
        overriddenValue: '2'
        value: '9'
      spark.dynamicAllocation.maxExecutors:
        annotation: Tuned Max executors to 11
        overriddenValue: '5'
        value: '11'
      spark.dynamicAllocation.minExecutors:
        annotation: Changed Min executors to 9
        overriddenValue: '2'
        value: '9'
...

Puede ver los últimos cambios de ajuste automático que se han aplicado a una carga de trabajo en curso, completada o fallida en la página Detalles del lote de la consola de Google Cloud , en la pestaña Resumen.

Panel de resumen de ajuste automático.

Precios

El ajuste automático de Serverless para Apache Spark se ofrece durante la versión preliminar privada sin coste adicional. Se aplican los precios estándar de Serverless para Apache Spark.