Executar o Spark no cluster do Ray na Vertex AI

A biblioteca Python RayDP torna possível executar o Spark em um cluster do Ray. Este documento abrange a instalação, a configuração e a execução do RayDP no Ray na Vertex AI (cluster do Ray na Vertex AI).

Instalação

O Ray na Vertex AI permite que os usuários executem aplicativos usando o framework do Ray de código aberto. A RayDP fornece APIs para executar o Spark no Ray. As imagens de contêiner pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não vêm com o RayDP pré-instalado. Isso significa que você precisa criar uma imagem de cluster do Ray personalizado na Vertex AI para que o cluster do Ray na Vertex AI execute aplicativos do RayDP no cluster do Ray na Vertex AI. A seção a seguir explica como criar uma imagem personalizada do RayDP.

Criar uma imagem de contêiner personalizada do Ray na Vertex AI

Use este dockerfile para criar uma imagem de contêiner personalizada para o Ray na Vertex AI com o RayDP instalado.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

É possível usar o cluster do Ray mais recente na imagem pré-criada da Vertex AI para criar a imagem personalizada do RayDP. Você também pode instalar outros pacotes Python que você prevê que usará nos seus aplicativos. O pyarrow==14.0 é devido a uma restrição de dependência do Ray 2.9.3.

Criar e enviar a imagem de contêiner personalizada

Crie um repositório do Docker no Artifact Registry antes de criar sua imagem personalizada (consulte Trabalhar com imagens de contêiner para saber como criar e configurar seu repositório do Docker). Depois de criar o repositório do Docker, crie e envie a imagem de contêiner personalizada usando o Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Em que:

  • LOCATION: o local do Cloud Storage (por exemplo, us-central1) que você criou no seu Artifact Registry.
  • PROJECT_ID: o ID do projeto do Google Cloud .
  • DOCKER_REPOSITORY: o nome do repositório do Docker que você criou.
  • IMAGE_NAME: o nome das suas imagens de contêiner personalizadas.

Criar um cluster do Ray na Vertex AI

Use a imagem personalizada de contêiner criada na etapa anterior para criar um cluster do Ray na Vertex AI. O SDK da Vertex AI para Python pode ser usado para criar um cluster do Ray na Vertex AI.

Instale as bibliotecas necessárias do Python, caso ainda não tenha feito isso.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configure nós de cabeçalho e de trabalho e crie o cluster usando o SDK da Vertex AI para Python. Por exemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Em que:

  • CUSTOM_CONTAINER_IMAGE_URI: o URI da imagem de contêiner personalizada enviada para o Artifact Registry.
  • CLUSTER_NAME: o nome do cluster do Ray na Vertex AI.

Cluster do Spark no Ray na Vertex AI

Antes de executar seu aplicativo Spark, crie uma sessão do Spark usando a API RayDP. É possível usar o cliente Ray para fazer isso de maneira interativa ou usar a API de jobs do Ray. A API de job do Ray é recomendada, especialmente para aplicativos de produção e de longa duração. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de oferecer suporte à configuração do Spark. Para saber mais sobre a API RayDP para criar uma sessão do Spark, consulte Afinidade do nó de atores mestres do Spark.

RayDP com cliente Ray

Você pode usar Ray Task ou Actor para criar um cluster e sessão do Spark no cluster do Ray na Vertex AI. O Ray Task ou Actor é necessário para usar um Cliente do Ray para criar uma sessão do Spark no cluster do Ray na Vertex AI. O código a seguir mostra como um Ray Actor pode criar uma sessão do Spark, executar um aplicativo do Spark e interromper um cluster do Spark em um cluster do Ray na Vertex AI usando o RayDP.

Para se conectar interativamente ao cluster do Ray na Vertex AI, consulte Conectar-se a um cluster do Ray pelo cliente do Ray

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP com a API Ray Job

O cliente do Ray é útil para pequenos experimentos que exigem conexão com o cluster do Ray. A API Ray Job é a maneira recomendada de executar jobs de longa duração e de produção em um cluster do Ray. Isso também se aplica à execução de aplicativos Spark no cluster do Ray na Vertex AI.

Criar um script Python que contenha o código do aplicativo Spark. Por exemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envie o job para executar o script do Python usando a API Ray Job. Por exemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Em que:

  • SCRIPT_NAME: o nome do arquivo do script que você criou.

Como ler arquivos do Cloud Storage do aplicativo Spark

É uma prática comum armazenar arquivos de dados em um bucket do Google Cloud Storage. É possível ler esses arquivos de várias maneiras em um aplicativo Spark que está em execução no cluster do Ray na Vertex AI. Esta seção explica duas técnicas para ler arquivos do Cloud Storage de aplicativos Spark em execução no cluster do Ray na Vertex AI.

Usar o conector do Google Cloud Storage

É possível usar o Google Cloud Connector para Hadoop para ler arquivos de um bucket do Cloud Storage do aplicativo Spark. Depois de criar uma sessão do Spark usando o RayDP, você pode ler arquivos usando alguns parâmetros de configuração. O código a seguir mostra como ler um arquivo CSV armazenado em um bucket do Cloud Storage em um aplicativo Spark no cluster do Ray na Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Em que:

  • GCS_FILE_URI: o URI de um arquivo armazenado em um bucket do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.

Usar dados do Ray

O conector Google Cloud permite ler arquivos de um bucket Google Cloud e pode ser suficiente para a maioria dos casos de uso. Talvez você queira usar o Ray Data para ler arquivos do bucket Google Cloud quando precisar usar o processamento distribuído do Ray para ler dados ou quando tiver problemas para ler o arquivoGoogle Cloud com o conector Google Cloud . Isso pode acontecer devido a conflitos de dependência do Java quando outras dependências de aplicativos são adicionadas ao classpath do Java do Spark usando spark.jars.packages ou spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDF do Pyspark Pandas no cluster do Ray na Vertex AI

As UDFs do Pyspark Pandas às vezes exigem código adicional quando usadas no aplicativo Spark em execução em um cluster do Ray na Vertex AI. Isso geralmente é necessário quando a UDF do Pandas usa uma biblioteca Python indisponível no cluster do Ray na Vertex AI. É possível empacotar as dependências do Python de um aplicativo usando o ambiente de execução com a API Ray Job. Depois que você enviar o job do Ray ao cluster, ele vai instalar essas dependências no ambiente virtual Python criado para executar o job. As UDFs do Pandas, no entanto, não usam o mesmo ambiente virtual. Em vez disso, eles usam o ambiente de sistema Python padrão. Se essa dependência não estiver disponível no ambiente do sistema, talvez seja necessário instalá-lo na UDF do Pandas. No exemplo a seguir, instale a biblioteca statsmodels na UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()