A biblioteca Python RayDP permite executar o Spark num cluster Ray. Este documento aborda a instalação, a configuração e a execução do RayDP no Ray no Vertex AI (cluster do Ray no Vertex AI).
Instalação
O Ray no Vertex AI permite que os utilizadores executem as respetivas aplicações através da framework Ray de código aberto. O RayDP fornece APIs para executar o Spark no Ray. As imagens de contentores pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não têm o RayDP pré-instalado. Isto significa que tem de criar uma imagem de cluster do Ray personalizada no Vertex AI para que o cluster do Ray no Vertex AI execute aplicações RayDP no cluster do Ray no Vertex AI. A secção seguinte explica como criar uma imagem personalizada do RayDP.
Crie uma imagem de contentor personalizada do Ray no Vertex AI
Use este Dockerfile para criar uma imagem de contentor personalizada para o Ray no Vertex AI com o RayDP instalado.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
Pode usar o cluster do Ray mais recente na imagem pré-criada do Vertex AI para criar a imagem personalizada do RayDP. Também pode instalar outros pacotes Python que prevê que vai usar nas suas aplicações. O erro pyarrow==14.0
deve-se a uma restrição de dependência do Ray 2.9.3.
Crie e envie a imagem de contentor personalizada
Crie um repositório Docker no Artifact Registry antes de criar a sua imagem personalizada (consulte o artigo Trabalhe com imagens de contentores para saber como criar e configurar o seu repositório Docker). Depois de criar o repositório do Docker, crie e envie a imagem do contentor personalizado através do Dockerfile.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
Onde:
LOCATION
: a localização do Cloud Storage (por exemplo, us-central1) que criou no Artifact Registry.PROJECT_ID
: o ID do seu Google Cloud projeto.DOCKER_REPOSITORY
: o nome do repositório do Docker que criou.IMAGE_NAME
: o nome das suas imagens de contentores personalizadas.
Crie um cluster do Ray na Vertex AI
Use a imagem do contentor personalizado criada no passo anterior para criar um cluster do Ray no Vertex AI. Pode usar o SDK Vertex AI para Python para criar um cluster do Ray na Vertex AI.
Se ainda não o fez, instale as bibliotecas Python necessárias.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
Configure os nós principais e de trabalho e crie o cluster através do SDK Vertex AI para Python. Por exemplo:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
Onde:
CUSTOM_CONTAINER_IMAGE_URI
: o URI da imagem do contentor personalizado enviada para o Artifact Registry.CLUSTER_NAME
: o nome do seu cluster do Ray no Vertex AI.
Cluster do Spark no Ray no Vertex AI
Antes de executar a aplicação Spark, crie uma sessão do Spark com a API RayDP. Pode usar o cliente Ray para o fazer interativamente ou usar a API de tarefas do Ray. A API Ray Job é recomendada, especialmente para aplicações de produção e de execução prolongada. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de suportar a configuração do Spark. Saiba mais sobre a API RayDP para criar uma sessão do Spark. Consulte o artigo Afinidade de nós dos atores principais do Spark.
RayDP com cliente Ray
Pode usar Task ou Actor do Ray para criar um cluster e uma sessão do Spark no cluster do Ray no Vertex AI. A tarefa do Ray, ou o ator, é necessária para usar um cliente do Ray para criar uma sessão do Spark no cluster do Ray no Vertex AI. O código seguinte mostra como um ator do Ray pode criar uma sessão do Spark, executar uma aplicação do Spark e parar um cluster do Spark num cluster do Ray no Vertex AI através do RayDP.
Para se ligar interativamente ao cluster do Ray no Vertex AI, consulte o artigo Ligue-se a um cluster do Ray através do cliente do Ray
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP com a API Ray Job
O cliente Ray é útil para pequenas experiências que requerem uma ligação interativa com o cluster Ray. A API Ray Job é a forma recomendada de executar tarefas de produção e de longa duração num cluster do Ray. Isto também se aplica à execução de aplicações Spark no cluster do Ray no Vertex AI.
Crie um script Python que contenha o código da sua aplicação Spark. Por exemplo:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
Envie a tarefa para executar o script Python através da API Ray Job. Por exemplo:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
Onde:
SCRIPT_NAME
: o nome do ficheiro do script que criou.
Ler ficheiros do Cloud Storage a partir da aplicação Spark
É uma prática comum armazenar ficheiros de dados num contentor do Google Cloud Storage. Pode ler estes ficheiros de várias formas a partir de uma aplicação Spark que esteja a ser executada no cluster do Ray no Vertex AI. Esta secção explica duas técnicas para ler ficheiros do Cloud Storage a partir de aplicações Spark executadas no cluster do Ray no Vertex AI.
Use o conetor do Google Cloud Storage
Pode usar o Google Cloud conetor para Hadoop para ler ficheiros de um contentor do Cloud Storage a partir da sua aplicação Spark. Depois de criar uma sessão do Spark com o RayDP, pode ler ficheiros com alguns parâmetros de configuração. O código seguinte mostra como ler um ficheiro CSV armazenado num contentor do Cloud Storage a partir de uma aplicação Spark no cluster do Ray na Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
Onde:
GCS_FILE_URI
: o URI de um ficheiro armazenado num contentor do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.
Use dados do Ray
O Google Cloud conector oferece uma forma de ler ficheiros de um Google Cloud
contentor e pode ser suficiente para a maioria dos exemplos de utilização. Pode usar os dados do Ray para ler ficheiros do contentor Google Cloud quando precisar de usar o processamento distribuído do Ray para ler dados ou quando tiver problemas ao ler o ficheiroGoogle Cloud com o conetor Google Cloud . Isto pode acontecer devido a conflitos de dependência do Java quando são adicionadas dependências de outras aplicações ao caminho de classe do Java do Spark através de spark.jars.packages
ou spark.jars
.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
UDFs do Pyspark Pandas no cluster do Ray na Vertex AI
As UDFs do Pyspark Pandas
requerem, por vezes, código adicional quando as usa na sua aplicação Spark
em execução num cluster do Ray no Vertex AI. Normalmente, isto é necessário quando a UDF do Pandas usa uma biblioteca Python que não está disponível no cluster do Ray no Vertex AI. Pode criar pacotes das dependências do Python de uma aplicação através do ambiente de execução com a API Ray Job. Depois de enviar a tarefa do Ray para o cluster, o Ray instala essas dependências no ambiente virtual do Python que cria para executar a tarefa. No entanto, as
UDFs do Pandas> não usam o mesmo ambiente virtual. Em alternativa, usam o ambiente do sistema Python predefinido. Se essa dependência não estiver disponível no ambiente do sistema, pode ter de a instalar no seu UDF do Pandas. No exemplo seguinte, instale a biblioteca statsmodels
na FDU.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()