La biblioteca de Python RayDP permite ejecutar Spark en un clúster de Ray. En este documento, se explica cómo instalar, configurar y ejecutar RayDP en Ray en Vertex AI (clúster de Ray en Vertex AI).
Instalación
Ray en Vertex AI permite a los usuarios ejecutar sus aplicaciones con el framework de Ray de código abierto. RayDP proporciona APIs para ejecutar Spark en Ray. Las imágenes de contenedor compiladas previamente disponibles para crear un clúster de Ray en Vertex AI no vienen con RayDP preinstalado. Esto significa que debes crear una imagen de clúster de Ray personalizado en Vertex AI para que tu clúster de Ray en Vertex AI ejecute aplicaciones de RayDP en el clúster de Ray en Vertex AI. En la siguiente sección, se explica cómo compilar una imagen personalizada de RayDP.
Compila una imagen de contenedor personalizada de Ray en Vertex AI
Usa este Dockerfile para crear una imagen de contenedor personalizada para Ray en Vertex AI que tenga RayDP instalado.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Puedes usar la imagen compilada previamente más reciente del clúster de Ray en Vertex AI para crear la imagen personalizada de RayDP. También puedes instalar otros paquetes de Python que crees que usarás en tus aplicaciones. El pyarrow==14.0
se debe a una restricción de dependencia de Ray 2.9.3.
Compila y envía la imagen del contenedor personalizado
Crea un repositorio de Docker en Artifact Registry antes de compilar tu imagen personalizada (consulta Trabaja con imágenes de contenedor para obtener información sobre cómo crear y configurar tu repositorio de Docker). Después de crear el repositorio de Docker, compila y envía la imagen de contenedor personalizada con el Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Aquí:
LOCATION
: Es la ubicación de Cloud Storage (por ejemplo, us-central1) que creaste en Artifact Registry.PROJECT_ID
: El ID de tu proyecto de Google Cloud .DOCKER_REPOSITORY
: Es el nombre del repositorio de Docker que creaste.IMAGE_NAME
: Es el nombre de tus imágenes de contenedor personalizadas.
Crea un clúster de Ray en Vertex AI
Usa la imagen de contenedor personalizada compilada en el paso anterior para crear un clúster de Ray en Vertex AI. Puedes usar el SDK de Vertex AI para Python para crear un clúster de Ray en Vertex AI.
Si aún no lo has hecho, instala las bibliotecas de Python necesarias.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configura los nodos principales y trabajadores, y crea el clúster con el SDK de Vertex AI para Python Por ejemplo:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Donde:
CUSTOM_CONTAINER_IMAGE_URI
: Es el URI de la imagen del contenedor personalizado que se envió a Artifact Registry.CLUSTER_NAME
: El nombre de tu clúster de Ray en Vertex AI.
Clúster de Spark on Ray en Vertex AI
Antes de ejecutar tu aplicación de Spark, crea una sesión de Spark con la API de RayDP. Puedes usar el cliente de Ray para hacerlo de forma interactiva o usar la API de Ray Job. Se recomienda la API de Ray Jobs, en especial para las aplicaciones de producción y de larga duración. La API de RayDP proporciona parámetros para configurar la sesión de Spark, además de admitir la configuración de Spark. Obtén más información sobre la API de RayDP para crear sesiones de Spark; consulta Afinidad de nodos de actores principales de Spark.
RayDP con Ray Client
Puedes usar la tarea o el actor de Ray para crear un clúster y una sesión de Spark en el clúster de Ray en Vertex AI. Se requiere la tarea de Ray o Actor para usar un cliente de Ray para crear una sesión de Spark en el clúster de Ray en Vertex AI. En el siguiente código, se muestra cómo un actor de Ray puede crear una sesión de Spark, ejecutar una aplicación de Spark y detener un clúster de Spark en un clúster de Ray en Vertex AI con RayDP.
Para conectarte de forma interactiva al clúster de Ray en Vertex AI, consulta Conéctate a un clúster de Ray a través de Ray Client.
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP con la API de Ray Job
El cliente de Ray es útil para experimentos pequeños que requieren una conexión interactiva con el clúster de Ray. La API de Ray Jobs es la forma recomendada de ejecutar trabajos de producción y de larga duración en un clúster de Ray. Esto también se aplica a la ejecución de aplicaciones de Spark en el clúster de Ray en Vertex AI.
Crea una secuencia de comandos de Python que contenga el código de tu aplicación de Spark. Por ejemplo:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envía el trabajo para ejecutar la secuencia de comandos de Python con la API de Ray Jobs. Por ejemplo:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Donde:
SCRIPT_NAME
: Es el nombre del archivo de la secuencia de comandos que creaste.
Cómo leer archivos de Cloud Storage desde una aplicación de Spark
Es una práctica común almacenar archivos de datos en un bucket de Google Cloud Storage. Puedes leer estos archivos de varias maneras desde una aplicación de Spark que se ejecuta en el clúster de Ray en Vertex AI. En esta sección, se explican dos técnicas para leer archivos de Cloud Storage desde aplicaciones de Spark que se ejecutan en el clúster de Ray en Vertex AI.
Usa el conector de Google Cloud Storage
Puedes usar el Google Cloud conector para Hadoop para leer archivos de un bucket de Cloud Storage desde tu aplicación Spark. Después de crear una sesión de Spark con RayDP, puedes leer archivos con algunos parámetros de configuración. En el siguiente código, se muestra cómo leer un archivo CSV almacenado en un bucket de Cloud Storage desde una aplicación de Spark en el clúster de Ray en Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Aquí:
GCS_FILE_URI
: Es el URI de un archivo almacenado en un bucket de Cloud Storage. Por ejemplo: gs://my-bucket/my-file.csv.
Usa datos de Ray
El conector Google Cloud proporciona una forma de leer archivos de un bucket Google Cloudy puede ser suficiente para la mayoría de los casos de uso. Es posible que desees usar Ray Data para leer archivos del bucket de Google Cloud cuando necesites usar el procesamiento distribuido de Ray para leer datos o cuando tengas problemas para leer archivos deGoogle Cloud con el conector de Google Cloud . Esto podría ocurrir debido a conflictos de dependencias de Java cuando se agregan otras dependencias de la aplicación a la ruta de clase de Java de Spark con spark.jars.packages
o spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF de Pyspark Pandas en el clúster de Ray en Vertex AI
Las UDF de Pyspark Pandas a veces requieren código adicional cuando las usas en tu aplicación de Spark que se ejecuta en un clúster de Ray en Vertex AI. Por lo general, esto es necesario cuando la UDF de Pandas usa una biblioteca de Python que no está disponible en el clúster de Ray en Vertex AI. Puedes empaquetar las dependencias de Python de una aplicación con el entorno de ejecución y la API de Ray Job. Después de enviar el trabajo de Ray al clúster, Ray instala esas dependencias en el entorno virtual de Python que crea para ejecutar el trabajo. Sin embargo, las UDF de Pandas no usan el mismo entorno virtual. En su lugar, usan el entorno predeterminado del sistema de Python. Si esa dependencia no está disponible en el entorno del sistema, es posible que debas instalarla en tu UDF de Pandas. En el siguiente ejemplo, instala la biblioteca statsmodels
dentro de la UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()