הרצת Spark באשכול Ray ב-Vertex AI

ספריית Python‏ RayDP מאפשרת להריץ את Spark באשכול Ray. במאמר הזה מוסבר איך להתקין, להגדיר ולהריץ את RayDP ב-Ray ב-Vertex AI (אשכול Ray ב-Vertex AI).

התקנה

‫Ray on Vertex AI מאפשר למשתמשים להריץ את האפליקציות שלהם באמצעות מסגרת Ray בקוד פתוח. ‫RayDP מספק ממשקי API להפעלת Spark ב-Ray. קובצי האימג' של קונטיינרים שזמינים ליצירת אשכול Ray ב-Vertex AI לא מגיעים עם RayDP שמותקן מראש. כלומר, אתם צריכים ליצור תמונה של אשכול Ray בהתאמה אישית ב-Vertex AI כדי שאשכול Ray ב-Vertex AI יוכל להריץ אפליקציות RayDP באשכול Ray ב-Vertex AI. בקטע הבא מוסבר איך ליצור תמונה מותאמת אישית של RayDP.

יצירת קובץ אימג' של קונטיינר מותאם אישית של Ray ב-Vertex AI

אפשר להשתמש בקובץ Dockerfile הזה כדי ליצור קובץ אימג' של קונטיינר בהתאמה אישית ל-Ray ב-Vertex AI, שמותקן בו RayDP.

FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest

RUN apt-get update -y \
    && pip install --no-cache-dir raydp pyarrow==14.0

אתם יכולים להשתמש באשכול Ray העדכני ביותר בתמונה המוכנה מראש של Vertex AI כדי ליצור את התמונה המותאמת אישית של RayDP. אפשר גם להתקין חבילות Python אחרות שאתם צופים שתשתמשו בהן באפליקציות שלכם. הבעיה pyarrow==14.0 נובעת מאילוץ תלות ב-Ray 2.9.3.

יצירה (build) ודחיפה (push) של קובץ אימג' מותאם אישית של קונטיינר

לפני שיוצרים קובץ אימג' בהתאמה אישית, צריך ליצור מאגר Docker ב-Artifact Registry (במאמר עבודה עם קובצי אימג' בקונטיינרים מוסבר איך ליצור ולהגדיר מאגר Docker). אחרי שיוצרים את מאגר Docker, יוצרים את קובץ האימג' של קונטיינר המותאם אישית ומעבירים אותו בדחיפה באמצעות קובץ Docker.

docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]

כאשר:

  • LOCATION: המיקום ב-Cloud Storage (לדוגמה, us-central1) שיצרתם ב-Artifact Registry.
  • PROJECT_ID: מזהה הפרויקט ב- Google Cloud .
  • DOCKER_REPOSITORY: השם של מאגר Docker שיצרתם.
  • IMAGE_NAME: השם של קובצי האימג' המותאמים אישית בקונטיינר.

יצירת אשכול Ray ב-Vertex AI

משתמשים בקובץ אימג' של קונטיינר בהתאמה אישית שנוצר בשלב הקודם כדי ליצור אשכול Ray ב-Vertex AI. אתם יכולים להשתמש ב-Vertex AI SDK ל-Python כדי ליצור אשכול Ray ב-Vertex AI.

אם עדיין לא עשיתם זאת, תצטרכו להתקין את ספריות Python הנדרשות.

pip install --quiet google-cloud-aiplatform \
             ray[all]==2.9.3 \
             google-cloud-aiplatform[ray]

מגדירים את הצמתים הראשיים ואת צמתים של העובדים ויוצרים את האשכול באמצעות Vertex AI SDK ל-Python. לדוגמה:

import logging
import ray
from google.cloud import aiplatform
from google.cloud.aiplatform import vertex_ray
from vertex_ray import Resources

head_node_type = Resources(
    machine_type="n1-standard-16",
    node_count=1,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)

worker_node_types = [Resources(
    machine_type="n1-standard-8",
    node_count=2,
    custom_image=[CUSTOM_CONTAINER_IMAGE_URI],
)]

ray_cluster_resource_name = vertex_ray.create_ray_cluster(
    head_node_type=head_node_type,
    worker_node_types=worker_node_types,
    cluster_name=[CLUSTER_NAME],
)

כאשר:

  • CUSTOM_CONTAINER_IMAGE_URI: ה-URI של קובץ אימג' של קונטיינר מותאם אישית שהועלה ל-Artifact Registry.
  • CLUSTER_NAME: השם של אשכול Ray ב-Vertex AI.

אשכול Spark on Ray ב-Vertex AI

לפני שמריצים את אפליקציית Spark, יוצרים סשן Spark באמצעות RayDP API. אפשר להשתמש בלקוח Ray כדי לעשות זאת באופן אינטראקטיבי, או להשתמש ב-Ray job API. מומלץ להשתמש ב-Ray job API, במיוחד באפליקציות שפועלות בסביבת ייצור ובאפליקציות שפועלות לאורך זמן. ה-API של RayDP מספק פרמטרים להגדרת סשן Spark, ותומך גם בהגדרת Spark. מידע נוסף על RayDP API ליצירת Spark Session זמין במאמר בנושא Spark master actors node affinity.

‫RayDP עם לקוח Ray

אתם יכולים להשתמש ב-Ray Task או ב-Actor כדי ליצור אשכול Spark וסשן באשכול Ray ב-Vertex AI. כדי ליצור סשן Spark באשכול Ray ב-Vertex AI, צריך להשתמש ב-Ray Client, ב-Ray Task או ב-Actor. בדוגמת הקוד הבאה מוצג איך Ray Actor יכול ליצור Spark Session, להריץ אפליקציית Spark ולהפסיק Spark cluster ב-Ray cluster ב-Vertex AI באמצעות RayDP.

כדי להתחבר לאשכול Ray ב-Vertex AI באופן אינטראקטיבי, אפשר לעיין במאמר התחברות לאשכול Ray באמצעות Ray Client.

@ray.remote
class SparkExecutor:
  import pyspark

  spark: pyspark.sql.SparkSession = None

  def __init__(self):

    import ray
    import raydp

    self.spark = raydp.init_spark(
      app_name="RAYDP ACTOR EXAMPLE",
      num_executors=1,
      executor_cores=1,
      executor_memory="500M",
    )

  def get_data(self):
    df = self.spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

  def stop_spark(self):
    import raydp
    raydp.stop_spark()

s = SparkExecutor.remote()
data = ray.get(s.get_data.remote())
print(data)
ray.get(s.stop_spark.remote())

‫RayDP עם Ray Job API

לקוח Ray שימושי לניסויים קטנים שדורשים חיבור אינטראקטיבי לאשכול Ray. ‫Ray Job API היא הדרך המומלצת להפעלת משימות ארוכות טווח ומשימות בסביבת הייצור באשכול Ray. הדבר נכון גם לגבי הרצת אפליקציות Spark באשכול Ray ב-Vertex AI.

יוצרים סקריפט Python שמכיל את קוד האפליקציה של Spark. לדוגמה:

import pyspark
import raydp

def get_data(spark: pyspark.sql.SparkSession):
    df = spark.createDataFrame(
        [
            ("sue", 32),
            ("li", 3),
            ("bob", 75),
            ("heo", 13),
        ],
        ["first_name", "age"],
    )
    return df.toJSON().collect()

def stop_spark():
    raydp.stop_spark()

if __name__ == '__main__':
    spark = raydp.init_spark(
      app_name="RAYDP JOB EXAMPLE",
        num_executors=1,
        executor_cores=1,
        executor_memory="500M",
    )
    print(get_data(spark))
    stop_spark()

שולחים את העבודה להרצת סקריפט Python באמצעות Ray Job API. לדוגמה:

from ray.job_submission import JobSubmissionClient

client = JobSubmissionClient(RAY_ADDRESS)

job_id = client.submit_job(
  # Entrypoint shell command to execute
  entrypoint="python [SCRIPT_NAME].py",
  # Path to the local directory that contains the python script file.
  runtime_env={
    "working_dir": ".",
  }
)

כאשר:

  • SCRIPT_NAME: שם הקובץ של הסקריפט שיצרתם.

קריאת קבצים ב-Cloud Storage מאפליקציית Spark

נהוג לאחסן קובצי נתונים בקטגוריה של Google Cloud Storage. אפשר לקרוא את הקבצים האלה בכמה דרכים מאפליקציית Spark שפועלת באשכול Ray ב-Vertex AI. בקטע הזה מוסברות שתי טכניקות לקריאת קובצי Cloud Storage מאפליקציות Spark שפועלות ב-Ray Cluster ב-Vertex AI.

שימוש במחבר Google Cloud Storage

אפשר להשתמש ב- Google Cloud Connector for Hadoop כדי לקרוא קבצים מקטגוריה של Cloud Storage מאפליקציית Spark. אחרי שיוצרים סשן Spark באמצעות RayDP, אפשר לקרוא קבצים באמצעות כמה פרמטרים של הגדרה. בדוגמת הקוד הבאה מוצג איך לקרוא קובץ CSV שמאוחסן בקטגוריה של Cloud Storage מתוך אפליקציית Spark באשכול Ray ב-Vertex AI.

import raydp

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 1",
  configs={
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

כאשר:

  • GCS_FILE_URI: ה-URI של קובץ שמאוחסן בקטגוריה של Cloud Storage. לדוגמה: gs://my-bucket/my-file.csv.

שימוש בנתוני Ray

מחבר Google Cloud מספק דרך לקרוא קבצים ממאגר Google Cloud ועשוי להספיק לרוב תרחישי השימוש. אפשר להשתמש ב-Ray Data כדי לקרוא קבצים מ- Google Cloud bucket כשצריך להשתמש בעיבוד המבוזר של Ray כדי לקרוא נתונים, או כשנתקלים בבעיות בקריאת קובץGoogle Cloud באמצעות המחבר Google Cloud . יכול להיות שהבעיה נובעת מסתירות בין תלויות של Java, כשמוסיפים תלויות של אפליקציה אחרת לנתיב המחלקה של Spark Java באמצעות spark.jars.packages או spark.jars.

import raydp
import ray

spark = raydp.init_spark(
  app_name="RayDP Cloud Storage Example 2",
  configs={
      "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3",
      "spark.jars.repositories": "https://mmlspark.azureedge.net/maven",
      "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar",
      "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
  },
  num_executors=2,
  executor_cores=4,
  executor_memory="500M",
)

# This doesn't work even though the Cloud Storage connector Jar and other parameters have
been added to the Spark configuration.
#spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)

ray_dataset = ray.data.read_csv(GCS_FILE_URI)
spark_df = ray_dataset.to_spark(spark)

פונקציית Pyspark Pandas UDF באשכול Ray ב-Vertex AI

פונקציות מוגדרות על ידי המשתמש (UDF) של Pyspark Pandas דורשות לפעמים קוד נוסף כשמשתמשים בהן באפליקציית Spark שפועלת באשכול Ray ב-Vertex AI. בדרך כלל נדרש להשתמש באפשרות הזו כשפונקציית Pandas UDF משתמשת בספריית Python שלא זמינה באשכול Ray ב-Vertex AI. אתם יכולים לארוז את יחסי התלות של Python של אפליקציה באמצעות סביבת זמן הריצה עם Ray Job API. אחרי ששולחים את משימת Ray לאשכול, Ray מתקין את התלות האלה בסביבה הווירטואלית של Python שהוא יוצר להרצת המשימה. אבל פונקציות Pandas UDF לא משתמשות באותה סביבה וירטואלית. במקום זאת, הם משתמשים בסביבת ברירת המחדל של מערכת Python. אם התלות הזו לא זמינה בסביבת המערכת, יכול להיות שתצטרכו להתקין אותה ב-UDF של Pandas. בדוגמה הבאה, ספריית statsmodels מותקנת בתוך ה-UDF.

import pandas as pd
import pyspark
import raydp
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType

def test_udf(spark: pyspark.sql.SparkSession):
    import pandas as pd

    df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv"))
    return df.select(func('Lottery','Literacy', 'Pop1831')).collect()

@pandas_udf(StringType())
def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str:
    import numpy as np
    import subprocess
    import sys
    subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"])
    import statsmodels.api as sm
    import statsmodels.formula.api as smf

    d = {'Lottery': s1,
         'Literacy': s2,
         'Pop1831': s3}
    data = pd.DataFrame(d)

    # Fit regression model (using the natural log of one of the regressors)
    results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit()
    return results.summary().as_csv()

if __name__ == '__main__':

    spark = raydp.init_spark(
      app_name="RayDP UDF Example",
      num_executors=2,
      executor_cores=4,
      executor_memory="1500M",
    )

    print(test_udf(spark))

    raydp.stop_spark()