Ejecuta Spark en el clúster de Ray en Vertex AI

La biblioteca de Python RayDP permite ejecutar Spark en un clúster de Ray. En este documento, se explica cómo instalar, configurar y ejecutar RayDP en Ray en Vertex AI (clúster de Ray en Vertex AI).

Instalación

Ray en Vertex AI permite a los usuarios ejecutar sus aplicaciones con el framework de Ray de código abierto. RayDP proporciona APIs para ejecutar Spark en Ray. Las imágenes de contenedor compiladas previamente disponibles para crear un clúster de Ray en Vertex AI no vienen con RayDP preinstalado. Esto significa que debes crear una imagen de clúster de Ray personalizado en Vertex AI para que tu clúster de Ray en Vertex AI ejecute aplicaciones de RayDP en el clúster de Ray en Vertex AI. En la siguiente sección, se explica cómo compilar una imagen personalizada de RayDP.

Compila una imagen de contenedor personalizada de Ray en Vertex AI

Usa este Dockerfile para crear una imagen de contenedor personalizada para Ray en Vertex AI que tenga RayDP instalado.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Puedes usar la imagen compilada previamente más reciente del clúster de Ray en Vertex AI para crear la imagen personalizada de RayDP. También puedes instalar otros paquetes de Python que crees que usarás en tus aplicaciones. El pyarrow==14.0 se debe a una restricción de dependencia de Ray 2.9.3.

Compila y envía la imagen del contenedor personalizado

Crea un repositorio de Docker en Artifact Registry antes de compilar tu imagen personalizada (consulta Trabaja con imágenes de contenedor para obtener información sobre cómo crear y configurar tu repositorio de Docker). Después de crear el repositorio de Docker, compila y envía la imagen de contenedor personalizada con el Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Aquí:

  • LOCATION: Es la ubicación de Cloud Storage (por ejemplo, us-central1) que creaste en Artifact Registry.
  • PROJECT_ID: El ID de tu proyecto de Google Cloud .
  • DOCKER_REPOSITORY: Es el nombre del repositorio de Docker que creaste.
  • IMAGE_NAME: Es el nombre de tus imágenes de contenedor personalizadas.

Crea un clúster de Ray en Vertex AI

Usa la imagen de contenedor personalizada compilada en el paso anterior para crear un clúster de Ray en Vertex AI. Puedes usar el SDK de Vertex AI para Python para crear un clúster de Ray en Vertex AI.

Si aún no lo has hecho, instala las bibliotecas de Python necesarias.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configura los nodos principales y trabajadores, y crea el clúster con el SDK de Vertex AI para Python Por ejemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Donde:

  • CUSTOM_CONTAINER_IMAGE_URI: Es el URI de la imagen del contenedor personalizado que se envió a Artifact Registry.
  • CLUSTER_NAME: El nombre de tu clúster de Ray en Vertex AI.

Clúster de Spark on Ray en Vertex AI

Antes de ejecutar tu aplicación de Spark, crea una sesión de Spark con la API de RayDP. Puedes usar el cliente de Ray para hacerlo de forma interactiva o usar la API de Ray Job. Se recomienda la API de Ray Jobs, en especial para las aplicaciones de producción y de larga duración. La API de RayDP proporciona parámetros para configurar la sesión de Spark, además de admitir la configuración de Spark. Obtén más información sobre la API de RayDP para crear sesiones de Spark; consulta Afinidad de nodos de actores principales de Spark.

RayDP con Ray Client

Puedes usar la tarea o el actor de Ray para crear un clúster y una sesión de Spark en el clúster de Ray en Vertex AI. Se requiere la tarea de Ray o Actor para usar un cliente de Ray para crear una sesión de Spark en el clúster de Ray en Vertex AI. En el siguiente código, se muestra cómo un actor de Ray puede crear una sesión de Spark, ejecutar una aplicación de Spark y detener un clúster de Spark en un clúster de Ray en Vertex AI con RayDP.

Para conectarte de forma interactiva al clúster de Ray en Vertex AI, consulta Conéctate a un clúster de Ray a través de Ray Client.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP con la API de Ray Job

El cliente de Ray es útil para experimentos pequeños que requieren una conexión interactiva con el clúster de Ray. La API de Ray Jobs es la forma recomendada de ejecutar trabajos de producción y de larga duración en un clúster de Ray. Esto también se aplica a la ejecución de aplicaciones de Spark en el clúster de Ray en Vertex AI.

Crea una secuencia de comandos de Python que contenga el código de tu aplicación de Spark. Por ejemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envía el trabajo para ejecutar la secuencia de comandos de Python con la API de Ray Jobs. Por ejemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Donde:

  • SCRIPT_NAME: Es el nombre del archivo de la secuencia de comandos que creaste.

Cómo leer archivos de Cloud Storage desde una aplicación de Spark

Es una práctica común almacenar archivos de datos en un bucket de Google Cloud Storage. Puedes leer estos archivos de varias maneras desde una aplicación de Spark que se ejecuta en el clúster de Ray en Vertex AI. En esta sección, se explican dos técnicas para leer archivos de Cloud Storage desde aplicaciones de Spark que se ejecutan en el clúster de Ray en Vertex AI.

Usa el conector de Google Cloud Storage

Puedes usar el Google Cloud conector para Hadoop para leer archivos de un bucket de Cloud Storage desde tu aplicación Spark. Después de crear una sesión de Spark con RayDP, puedes leer archivos con algunos parámetros de configuración. En el siguiente código, se muestra cómo leer un archivo CSV almacenado en un bucket de Cloud Storage desde una aplicación de Spark en el clúster de Ray en Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Aquí:

  • GCS_FILE_URI: Es el URI de un archivo almacenado en un bucket de Cloud Storage. Por ejemplo: gs://my-bucket/my-file.csv.

Usa datos de Ray

El conector Google Cloud proporciona una forma de leer archivos de un bucket Google Cloudy puede ser suficiente para la mayoría de los casos de uso. Es posible que desees usar Ray Data para leer archivos del bucket de Google Cloud cuando necesites usar el procesamiento distribuido de Ray para leer datos o cuando tengas problemas para leer archivos deGoogle Cloud con el conector de Google Cloud . Esto podría ocurrir debido a conflictos de dependencias de Java cuando se agregan otras dependencias de la aplicación a la ruta de clase de Java de Spark con spark.jars.packages o spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF de Pyspark Pandas en el clúster de Ray en Vertex AI

Las UDF de Pyspark Pandas a veces requieren código adicional cuando las usas en tu aplicación de Spark que se ejecuta en un clúster de Ray en Vertex AI. Por lo general, esto es necesario cuando la UDF de Pandas usa una biblioteca de Python que no está disponible en el clúster de Ray en Vertex AI. Puedes empaquetar las dependencias de Python de una aplicación con el entorno de ejecución y la API de Ray Job. Después de enviar el trabajo de Ray al clúster, Ray instala esas dependencias en el entorno virtual de Python que crea para ejecutar el trabajo. Sin embargo, las UDF de Pandas no usan el mismo entorno virtual. En su lugar, usan el entorno predeterminado del sistema de Python. Si esa dependencia no está disponible en el entorno del sistema, es posible que debas instalarla en tu UDF de Pandas. En el siguiente ejemplo, instala la biblioteca statsmodels dentro de la UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()