Execute o Spark no cluster do Ray no Vertex AI

A biblioteca Python RayDP permite executar o Spark num cluster Ray. Este documento aborda a instalação, a configuração e a execução do RayDP no Ray no Vertex AI (cluster do Ray no Vertex AI).

Instalação

O Ray no Vertex AI permite que os utilizadores executem as respetivas aplicações através da framework Ray de código aberto. O RayDP fornece APIs para executar o Spark no Ray. As imagens de contentores pré-criadas disponíveis para criar um cluster do Ray na Vertex AI não têm o RayDP pré-instalado. Isto significa que tem de criar uma imagem de cluster do Ray personalizada no Vertex AI para que o cluster do Ray no Vertex AI execute aplicações RayDP no cluster do Ray no Vertex AI. A secção seguinte explica como criar uma imagem personalizada do RayDP.

Crie uma imagem de contentor personalizada do Ray no Vertex AI

Use este Dockerfile para criar uma imagem de contentor personalizada para o Ray no Vertex AI com o RayDP instalado.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

Pode usar o cluster do Ray mais recente na imagem pré-criada do Vertex AI para criar a imagem personalizada do RayDP. Também pode instalar outros pacotes Python que prevê que vai usar nas suas aplicações. O erro pyarrow==14.0 deve-se a uma restrição de dependência do Ray 2.9.3.

Crie e envie a imagem de contentor personalizada

Crie um repositório Docker no Artifact Registry antes de criar a sua imagem personalizada (consulte o artigo Trabalhe com imagens de contentores para saber como criar e configurar o seu repositório Docker). Depois de criar o repositório do Docker, crie e envie a imagem do contentor personalizado através do Dockerfile.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

Onde:

  • LOCATION: a localização do Cloud Storage (por exemplo, us-central1) que criou no Artifact Registry.
  • PROJECT_ID: o ID do seu Google Cloud projeto.
  • DOCKER_REPOSITORY: o nome do repositório do Docker que criou.
  • IMAGE_NAME: o nome das suas imagens de contentores personalizadas.

Crie um cluster do Ray na Vertex AI

Use a imagem do contentor personalizado criada no passo anterior para criar um cluster do Ray no Vertex AI. Pode usar o SDK Vertex AI para Python para criar um cluster do Ray na Vertex AI.

Se ainda não o fez, instale as bibliotecas Python necessárias.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

Configure os nós principais e de trabalho e crie o cluster através do SDK Vertex AI para Python. Por exemplo:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

Onde:

  • CUSTOM_CONTAINER_IMAGE_URI: o URI da imagem do contentor personalizado enviada para o Artifact Registry.
  • CLUSTER_NAME: o nome do seu cluster do Ray no Vertex AI.

Cluster do Spark no Ray no Vertex AI

Antes de executar a aplicação Spark, crie uma sessão do Spark com a API RayDP. Pode usar o cliente Ray para o fazer interativamente ou usar a API de tarefas do Ray. A API Ray Job é recomendada, especialmente para aplicações de produção e de execução prolongada. A API RayDP fornece parâmetros para configurar a sessão do Spark, além de suportar a configuração do Spark. Saiba mais sobre a API RayDP para criar uma sessão do Spark. Consulte o artigo Afinidade de nós dos atores principais do Spark.

RayDP com cliente Ray

Pode usar Task ou Actor do Ray para criar um cluster e uma sessão do Spark no cluster do Ray no Vertex AI. A tarefa do Ray, ou o ator, é necessária para usar um cliente do Ray para criar uma sessão do Spark no cluster do Ray no Vertex AI. O código seguinte mostra como um ator do Ray pode criar uma sessão do Spark, executar uma aplicação do Spark e parar um cluster do Spark num cluster do Ray no Vertex AI através do RayDP.

Para se ligar interativamente ao cluster do Ray no Vertex AI, consulte o artigo Ligue-se a um cluster do Ray através do cliente do Ray

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

RayDP com a API Ray Job

O cliente Ray é útil para pequenas experiências que requerem uma ligação interativa com o cluster Ray. A API Ray Job é a forma recomendada de executar tarefas de produção e de longa duração num cluster do Ray. Isto também se aplica à execução de aplicações Spark no cluster do Ray no Vertex AI.

Crie um script Python que contenha o código da sua aplicação Spark. Por exemplo:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

Envie a tarefa para executar o script Python através da API Ray Job. Por exemplo:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

Onde:

  • SCRIPT_NAME: o nome do ficheiro do script que criou.

Ler ficheiros do Cloud Storage a partir da aplicação Spark

É uma prática comum armazenar ficheiros de dados num contentor do Google Cloud Storage. Pode ler estes ficheiros de várias formas a partir de uma aplicação Spark que esteja a ser executada no cluster do Ray no Vertex AI. Esta secção explica duas técnicas para ler ficheiros do Cloud Storage a partir de aplicações Spark executadas no cluster do Ray no Vertex AI.

Use o conetor do Google Cloud Storage

Pode usar o Google Cloud conetor para Hadoop para ler ficheiros de um contentor do Cloud Storage a partir da sua aplicação Spark. Depois de criar uma sessão do Spark com o RayDP, pode ler ficheiros com alguns parâmetros de configuração. O código seguinte mostra como ler um ficheiro CSV armazenado num contentor do Cloud Storage a partir de uma aplicação Spark no cluster do Ray na Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

Onde:

  • GCS_FILE_URI: o URI de um ficheiro armazenado num contentor do Cloud Storage. Por exemplo: gs://my-bucket/my-file.csv.

Use dados do Ray

O Google Cloud conector oferece uma forma de ler ficheiros de um Google Cloud contentor e pode ser suficiente para a maioria dos exemplos de utilização. Pode usar os dados do Ray para ler ficheiros do contentor Google Cloud quando precisar de usar o processamento distribuído do Ray para ler dados ou quando tiver problemas ao ler o ficheiroGoogle Cloud com o conetor Google Cloud . Isto pode acontecer devido a conflitos de dependência do Java quando são adicionadas dependências de outras aplicações ao caminho de classe do Java do Spark através de spark.jars.packages ou spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

UDFs do Pyspark Pandas no cluster do Ray na Vertex AI

As UDFs do Pyspark Pandas requerem, por vezes, código adicional quando as usa na sua aplicação Spark em execução num cluster do Ray no Vertex AI. Normalmente, isto é necessário quando a UDF do Pandas usa uma biblioteca Python que não está disponível no cluster do Ray no Vertex AI. Pode criar pacotes das dependências do Python de uma aplicação através do ambiente de execução com a API Ray Job. Depois de enviar a tarefa do Ray para o cluster, o Ray instala essas dependências no ambiente virtual do Python que cria para executar a tarefa. No entanto, as UDFs do Pandas> não usam o mesmo ambiente virtual. Em alternativa, usam o ambiente do sistema Python predefinido. Se essa dependência não estiver disponível no ambiente do sistema, pode ter de a instalar no seu UDF do Pandas. No exemplo seguinte, instale a biblioteca statsmodels na FDU.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()