A biblioteca Python RayDP torna possível executar o Spark em um cluster do Ray. Este documento abrange a instalação, a configuração e a execução do RayDP no Ray na Vertex AI (cluster do Ray na Vertex AI).
Instalação
O Ray na Vertex AI permite que os usuários executem aplicativos usando o framework do Ray de código aberto. A RayDP fornece APIs para executar o Spark no Ray. As imagens de contêiner pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não vêm com o RayDP pré-instalado. Isso significa que você precisa criar uma imagem de cluster do Ray personalizado na Vertex AI para que o cluster do Ray na Vertex AI execute aplicativos do RayDP no cluster do Ray na Vertex AI. A seção a seguir explica como criar uma imagem personalizada do RayDP.
Criar uma imagem de contêiner personalizada do Ray na Vertex AI
Use este dockerfile para criar uma imagem de contêiner personalizada para o Ray na Vertex AI com o RayDP instalado.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
É possível usar o cluster do Ray mais recente na imagem pré-criada da Vertex AI para
criar a imagem personalizada do RayDP. Você também pode instalar outros pacotes Python
que você prevê que usará nos seus aplicativos. O pyarrow==14.0
é devido
a uma restrição de dependência do Ray 2.9.3.
Criar e enviar a imagem de contêiner personalizada
Crie um repositório do Docker no Artifact Registry antes de criar sua imagem personalizada (consulte Trabalhar com imagens de contêiner para saber como criar e configurar seu repositório do Docker). Depois de criar o repositório do Docker, crie e envie a imagem de contêiner personalizada usando o Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Em que:
LOCATION
: o local do Cloud Storage (por exemplo, us-central1) que você criou no seu Artifact Registry.PROJECT_ID
: o ID do projeto do Google Cloud .DOCKER_REPOSITORY
: o nome do repositório do Docker que você criou.IMAGE_NAME
: o nome das suas imagens de contêiner personalizadas.
Criar um cluster do Ray na Vertex AI
Use a imagem personalizada de contêiner criada na etapa anterior para criar um cluster do Ray na Vertex AI. O SDK da Vertex AI para Python pode ser usado para criar um cluster do Ray na Vertex AI.
Instale as bibliotecas necessárias do Python, caso ainda não tenha feito isso.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configure nós de cabeçalho e de trabalho e crie o cluster usando o SDK da Vertex AI para Python. Por exemplo:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Em que:
CUSTOM_CONTAINER_IMAGE_URI
: o URI da imagem de contêiner personalizada enviada para o Artifact Registry.CLUSTER_NAME
: o nome do cluster do Ray na Vertex AI.
Cluster do Spark no Ray na Vertex AI
Antes de executar seu aplicativo Spark, crie uma sessão do Spark usando a API RayDP. É possível usar o cliente Ray para fazer isso de maneira interativa ou usar a API de jobs do Ray. A API de job do Ray é recomendada, especialmente para aplicativos de produção e de longa duração. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de oferecer suporte à configuração do Spark. Para saber mais sobre a API RayDP para criar uma sessão do Spark, consulte Afinidade do nó de atores mestres do Spark.
RayDP com cliente Ray
Você pode usar Ray Task ou Actor para criar um cluster e sessão do Spark no cluster do Ray na Vertex AI. O Ray Task ou Actor é necessário para usar um Cliente do Ray para criar uma sessão do Spark no cluster do Ray na Vertex AI. O código a seguir mostra como um Ray Actor pode criar uma sessão do Spark, executar um aplicativo do Spark e interromper um cluster do Spark em um cluster do Ray na Vertex AI usando o RayDP.
Para se conectar interativamente ao cluster do Ray na Vertex AI, consulte Conectar-se a um cluster do Ray pelo cliente do Ray
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP com a API Ray Job
O cliente do Ray é útil para pequenos experimentos que exigem conexão com o cluster do Ray. A API Ray Job é a maneira recomendada de executar jobs de longa duração e de produção em um cluster do Ray. Isso também se aplica à execução de aplicativos Spark no cluster do Ray na Vertex AI.
Criar um script Python que contenha o código do aplicativo Spark. Por exemplo:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envie o job para executar o script do Python usando a API Ray Job. Por exemplo:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Em que:
SCRIPT_NAME
: o nome do arquivo do script que você criou.
Como ler arquivos do Cloud Storage do aplicativo Spark
É uma prática comum armazenar arquivos de dados em um bucket do Google Cloud Storage. É possível ler esses arquivos de várias maneiras em um aplicativo Spark que está em execução no cluster do Ray na Vertex AI. Esta seção explica duas técnicas para ler arquivos do Cloud Storage de aplicativos Spark em execução no cluster do Ray na Vertex AI.
Usar o conector do Google Cloud Storage
É possível usar o Google Cloud Connector para Hadoop para ler arquivos de um bucket do Cloud Storage do aplicativo Spark. Depois de criar uma sessão do Spark usando o RayDP, você pode ler arquivos usando alguns parâmetros de configuração. O código a seguir mostra como ler um arquivo CSV armazenado em um bucket do Cloud Storage em um aplicativo Spark no cluster do Ray na Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Em que:
GCS_FILE_URI
: o URI de um arquivo armazenado em um bucket do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.
Usar dados do Ray
O conector Google Cloud permite ler arquivos de um bucket Google Cloud
e pode ser suficiente para a maioria dos casos de uso. Talvez você queira usar
o Ray Data para ler arquivos do bucket Google Cloud quando precisar usar o
processamento distribuído do Ray para ler dados ou quando tiver problemas para ler o
arquivoGoogle Cloud com o conector Google Cloud . Isso pode acontecer devido a conflitos de dependência do Java quando outras dependências de aplicativos são adicionadas ao classpath do Java do Spark usando spark.jars.packages
ou spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDF do Pyspark Pandas no cluster do Ray na Vertex AI
As UDFs do Pyspark Pandas
às vezes exigem código adicional quando usadas no aplicativo
Spark em execução em um cluster do Ray na Vertex AI. Isso geralmente é
necessário quando a UDF do Pandas usa uma biblioteca Python indisponível no
cluster do Ray na Vertex AI. É possível empacotar as
dependências do Python
de um aplicativo usando o ambiente de execução com a API Ray Job. Depois que você
enviar o job do Ray ao cluster, ele vai instalar essas dependências no
ambiente virtual Python criado para executar o job. As
UDFs do Pandas,
no entanto, não usam o mesmo ambiente virtual. Em vez disso, eles usam o
ambiente de sistema Python padrão. Se essa dependência não estiver disponível no ambiente do
sistema, talvez seja necessário instalá-lo na UDF do Pandas. No
exemplo a seguir, instale a biblioteca statsmodels
na UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()