ספריית Python RayDP מאפשרת להריץ את Spark באשכול Ray. במאמר הזה מוסבר איך להתקין, להגדיר ולהריץ את RayDP ב-Ray ב-Vertex AI (אשכול Ray ב-Vertex AI).
התקנה
Ray on Vertex AI מאפשר למשתמשים להריץ את האפליקציות שלהם באמצעות מסגרת Ray בקוד פתוח. RayDP מספק ממשקי API להפעלת Spark ב-Ray. קובצי האימג' של קונטיינרים שזמינים ליצירת אשכול Ray ב-Vertex AI לא מגיעים עם RayDP שמותקן מראש. כלומר, אתם צריכים ליצור תמונה של אשכול Ray בהתאמה אישית ב-Vertex AI כדי שאשכול Ray ב-Vertex AI יוכל להריץ אפליקציות RayDP באשכול Ray ב-Vertex AI. בקטע הבא מוסבר איך ליצור תמונה מותאמת אישית של RayDP.
יצירת קובץ אימג' של קונטיינר מותאם אישית של Ray ב-Vertex AI
אפשר להשתמש בקובץ Dockerfile הזה כדי ליצור קובץ אימג' של קונטיינר בהתאמה אישית ל-Ray ב-Vertex AI, שמותקן בו RayDP.
FROM us-docker.pkg.dev/vertex-ai/training/ray-cpu.2-9.py310:latest RUN apt-get update -y \ && pip install --no-cache-dir raydp pyarrow==14.0
אתם יכולים להשתמש באשכול Ray העדכני ביותר בתמונה המוכנה מראש של Vertex AI כדי ליצור את התמונה המותאמת אישית של RayDP. אפשר גם להתקין חבילות Python אחרות שאתם צופים שתשתמשו בהן באפליקציות שלכם. הבעיה pyarrow==14.0 נובעת מאילוץ תלות ב-Ray 2.9.3.
יצירה (build) ודחיפה (push) של קובץ אימג' מותאם אישית של קונטיינר
לפני שיוצרים קובץ אימג' בהתאמה אישית, צריך ליצור מאגר Docker ב-Artifact Registry (במאמר עבודה עם קובצי אימג' בקונטיינרים מוסבר איך ליצור ולהגדיר מאגר Docker). אחרי שיוצרים את מאגר Docker, יוצרים את קובץ האימג' של קונטיינר המותאם אישית ומעבירים אותו בדחיפה באמצעות קובץ Docker.
docker build . -t [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME] docker push [LOCATION]-docker.pkg.dev/[PROJECT_ID]/[DOCKER_REPOSITORY]/[IMAGE_NAME]
כאשר:
-
LOCATION: המיקום ב-Cloud Storage (לדוגמה, us-central1) שיצרתם ב-Artifact Registry. -
PROJECT_ID: מזהה הפרויקט ב- Google Cloud . -
DOCKER_REPOSITORY: השם של מאגר Docker שיצרתם. -
IMAGE_NAME: השם של קובצי האימג' המותאמים אישית בקונטיינר.
יצירת אשכול Ray ב-Vertex AI
משתמשים בקובץ אימג' של קונטיינר בהתאמה אישית שנוצר בשלב הקודם כדי ליצור אשכול Ray ב-Vertex AI. אתם יכולים להשתמש ב-Vertex AI SDK ל-Python כדי ליצור אשכול Ray ב-Vertex AI.
אם עדיין לא עשיתם זאת, תצטרכו להתקין את ספריות Python הנדרשות.
pip install --quiet google-cloud-aiplatform \ ray[all]==2.9.3 \ google-cloud-aiplatform[ray]
מגדירים את הצמתים הראשיים ואת צמתים של העובדים ויוצרים את האשכול באמצעות Vertex AI SDK ל-Python. לדוגמה:
import logging import ray from google.cloud import aiplatform from google.cloud.aiplatform import vertex_ray from vertex_ray import Resources head_node_type = Resources( machine_type="n1-standard-16", node_count=1, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], ) worker_node_types = [Resources( machine_type="n1-standard-8", node_count=2, custom_image=[CUSTOM_CONTAINER_IMAGE_URI], )] ray_cluster_resource_name = vertex_ray.create_ray_cluster( head_node_type=head_node_type, worker_node_types=worker_node_types, cluster_name=[CLUSTER_NAME], )
כאשר:
-
CUSTOM_CONTAINER_IMAGE_URI: ה-URI של קובץ אימג' של קונטיינר מותאם אישית שהועלה ל-Artifact Registry. -
CLUSTER_NAME: השם של אשכול Ray ב-Vertex AI.
אשכול Spark on Ray ב-Vertex AI
לפני שמריצים את אפליקציית Spark, יוצרים סשן Spark באמצעות RayDP API. אפשר להשתמש בלקוח Ray כדי לעשות זאת באופן אינטראקטיבי, או להשתמש ב-Ray job API. מומלץ להשתמש ב-Ray job API, במיוחד באפליקציות שפועלות בסביבת ייצור ובאפליקציות שפועלות לאורך זמן. ה-API של RayDP מספק פרמטרים להגדרת סשן Spark, ותומך גם בהגדרת Spark. מידע נוסף על RayDP API ליצירת Spark Session זמין במאמר בנושא Spark master actors node affinity.
RayDP עם לקוח Ray
אתם יכולים להשתמש ב-Ray Task או ב-Actor כדי ליצור אשכול Spark וסשן באשכול Ray ב-Vertex AI. כדי ליצור סשן Spark באשכול Ray ב-Vertex AI, צריך להשתמש ב-Ray Client, ב-Ray Task או ב-Actor. בדוגמת הקוד הבאה מוצג איך Ray Actor יכול ליצור Spark Session, להריץ אפליקציית Spark ולהפסיק Spark cluster ב-Ray cluster ב-Vertex AI באמצעות RayDP.
כדי להתחבר לאשכול Ray ב-Vertex AI באופן אינטראקטיבי, אפשר לעיין במאמר התחברות לאשכול Ray באמצעות Ray Client.
@ray.remote class SparkExecutor: import pyspark spark: pyspark.sql.SparkSession = None def __init__(self): import ray import raydp self.spark = raydp.init_spark( app_name="RAYDP ACTOR EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) def get_data(self): df = self.spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(self): import raydp raydp.stop_spark() s = SparkExecutor.remote() data = ray.get(s.get_data.remote()) print(data) ray.get(s.stop_spark.remote())
RayDP עם Ray Job API
לקוח Ray שימושי לניסויים קטנים שדורשים חיבור אינטראקטיבי לאשכול Ray. Ray Job API היא הדרך המומלצת להפעלת משימות ארוכות טווח ומשימות בסביבת הייצור באשכול Ray. הדבר נכון גם לגבי הרצת אפליקציות Spark באשכול Ray ב-Vertex AI.
יוצרים סקריפט Python שמכיל את קוד האפליקציה של Spark. לדוגמה:
import pyspark import raydp def get_data(spark: pyspark.sql.SparkSession): df = spark.createDataFrame( [ ("sue", 32), ("li", 3), ("bob", 75), ("heo", 13), ], ["first_name", "age"], ) return df.toJSON().collect() def stop_spark(): raydp.stop_spark() if __name__ == '__main__': spark = raydp.init_spark( app_name="RAYDP JOB EXAMPLE", num_executors=1, executor_cores=1, executor_memory="500M", ) print(get_data(spark)) stop_spark()
שולחים את העבודה להרצת סקריפט Python באמצעות Ray Job API. לדוגמה:
from ray.job_submission import JobSubmissionClient client = JobSubmissionClient(RAY_ADDRESS) job_id = client.submit_job( # Entrypoint shell command to execute entrypoint="python [SCRIPT_NAME].py", # Path to the local directory that contains the python script file. runtime_env={ "working_dir": ".", } )
כאשר:
-
SCRIPT_NAME: שם הקובץ של הסקריפט שיצרתם.
קריאת קבצים ב-Cloud Storage מאפליקציית Spark
נהוג לאחסן קובצי נתונים בקטגוריה של Google Cloud Storage. אפשר לקרוא את הקבצים האלה בכמה דרכים מאפליקציית Spark שפועלת באשכול Ray ב-Vertex AI. בקטע הזה מוסברות שתי טכניקות לקריאת קובצי Cloud Storage מאפליקציות Spark שפועלות ב-Ray Cluster ב-Vertex AI.
שימוש במחבר Google Cloud Storage
אפשר להשתמש ב- Google Cloud Connector for Hadoop כדי לקרוא קבצים מקטגוריה של Cloud Storage מאפליקציית Spark. אחרי שיוצרים סשן Spark באמצעות RayDP, אפשר לקרוא קבצים באמצעות כמה פרמטרים של הגדרה. בדוגמת הקוד הבאה מוצג איך לקרוא קובץ CSV שמאוחסן בקטגוריה של Cloud Storage מתוך אפליקציית Spark באשכול Ray ב-Vertex AI.
import raydp spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 1", configs={ "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) spark_df = spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True)
כאשר:
-
GCS_FILE_URI: ה-URI של קובץ שמאוחסן בקטגוריה של Cloud Storage. לדוגמה: gs://my-bucket/my-file.csv.
שימוש בנתוני Ray
מחבר Google Cloud מספק דרך לקרוא קבצים ממאגר Google Cloud
ועשוי להספיק לרוב תרחישי השימוש. אפשר להשתמש ב-Ray Data כדי לקרוא קבצים מ- Google Cloud bucket כשצריך להשתמש בעיבוד המבוזר של Ray כדי לקרוא נתונים, או כשנתקלים בבעיות בקריאת קובץGoogle Cloud באמצעות המחבר Google Cloud . יכול להיות שהבעיה נובעת מסתירות בין תלויות של Java, כשמוסיפים תלויות של אפליקציה אחרת לנתיב המחלקה של Spark Java באמצעות spark.jars.packages או spark.jars.
import raydp import ray spark = raydp.init_spark( app_name="RayDP Cloud Storage Example 2", configs={ "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars": "https://storage.googleapis.com/hadoop-lib/gcs/gcs-connector-hadoop3-2.2.22.jar", "spark.hadoop.fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS", "spark.hadoop.fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem", }, num_executors=2, executor_cores=4, executor_memory="500M", ) # This doesn't work even though the Cloud Storage connector Jar and other parameters have been added to the Spark configuration. #spark.read.csv([GCS_FILE_URI], header = True, inferSchema = True) ray_dataset = ray.data.read_csv(GCS_FILE_URI) spark_df = ray_dataset.to_spark(spark)
פונקציית Pyspark Pandas UDF באשכול Ray ב-Vertex AI
פונקציות מוגדרות על ידי המשתמש (UDF) של Pyspark Pandas
דורשות לפעמים קוד נוסף כשמשתמשים בהן באפליקציית Spark שפועלת באשכול Ray ב-Vertex AI. בדרך כלל נדרש להשתמש באפשרות הזו כשפונקציית Pandas UDF משתמשת בספריית Python שלא זמינה באשכול Ray ב-Vertex AI. אתם יכולים לארוז את יחסי התלות של Python של אפליקציה באמצעות סביבת זמן הריצה עם Ray Job API. אחרי ששולחים את משימת Ray לאשכול, Ray מתקין את התלות האלה בסביבה הווירטואלית של Python שהוא יוצר להרצת המשימה. אבל פונקציות Pandas UDF לא משתמשות באותה סביבה וירטואלית. במקום זאת, הם משתמשים בסביבת ברירת המחדל של מערכת Python. אם התלות הזו לא זמינה בסביבת המערכת, יכול להיות שתצטרכו להתקין אותה ב-UDF של Pandas. בדוגמה הבאה, ספריית statsmodels מותקנת בתוך ה-UDF.
import pandas as pd import pyspark import raydp from pyspark.sql.functions import pandas_udf from pyspark.sql.types import StringType def test_udf(spark: pyspark.sql.SparkSession): import pandas as pd df = spark.createDataFrame(pd.read_csv("https://www.datavis.ca/gallery/guerry/guerry.csv")) return df.select(func('Lottery','Literacy', 'Pop1831')).collect() @pandas_udf(StringType()) def func(s1: pd.Series, s2: pd.Series, s3: pd.Series) -> str: import numpy as np import subprocess import sys subprocess.check_call([sys.executable, "-m", "pip", "install", "statsmodels"]) import statsmodels.api as sm import statsmodels.formula.api as smf d = {'Lottery': s1, 'Literacy': s2, 'Pop1831': s3} data = pd.DataFrame(d) # Fit regression model (using the natural log of one of the regressors) results = smf.ols('Lottery ~ Literacy + np.log(Pop1831)', data=data).fit() return results.summary().as_csv() if __name__ == '__main__': spark = raydp.init_spark( app_name="RayDP UDF Example", num_executors=2, executor_cores=4, executor_memory="1500M", ) print(test_udf(spark)) raydp.stop_spark()