אחרי שמבצעים אימון של מודל באשכול Ray ב-Gemini Enterprise Agent Platform, אפשר לפרוס את המודל כדי לשלוח בקשות הסקה אונליין באמצעות התהליך הבא:
מייצאים את המודל מנקודת הביקורת של Ray.
מעלים את המודל אל מרשם המודלים.
פורסים את המודל בנקודת קצה.
יצירת בקשות להסקת מסקנות.
השלבים בקטע הזה מבוססים על ההנחה שאתם משתמשים ב-Ray on Agent Platform SDK בסביבת Python אינטראקטיבית.
השוואה בין הסקת מסקנות אונליין ב-Gemini Enterprise Agent Platform לבין הסקת מסקנות ב-Ray
| תכונה | הסקת מסקנות אונליין ב-Gemini Enterprise Agent Platform (מומלץ) | הסקת מסקנות באמצעות Ray (Ray Serve) |
|---|---|---|
| מדרגיות | התאמה אוטומטית לעומס על סמך תעבורה (ניתן להרחבה רבה גם למודלים של LLM) | גמישות גבוהה עם קצה עורפי מבוזר וניהול משאבים בהתאמה אישית |
| ניהול התשתיות | מנוהל באופן מלא על ידי Google Cloud, תקורה תפעולית נמוכה יותר | נדרשת הגדרה וניהול ידניים נוספים בתשתית או באשכול Kubernetes |
| API/תכונות נתמכות | ממשקי API ל-REST ול-gRPC, היקשים אונליין ובאצווה, תכונות של יכולת הסברה, עיבוד נתונים באצווה, שמירה במטמון, סטרימינג | ממשקי API של REST ו-gRPC, הסקת מסקנות בזמן אמת ובאצווה, הרכבת מודלים, אצווה, שמירה במטמון, סטרימינג |
| פורמט המודל | תמיכה במסגרות שונות כמו TensorFlow, PyTorch, scikit-learn, XGBoost באמצעות קונטיינרים מוכנים מראש או קונטיינרים בהתאמה אישית | תמיכה במסגרות שונות כמו TensorFlow, PyTorch, scikit-learn. |
| קלות השימוש | קל יותר להגדיר ולנהל, משולב עם תכונות אחרות של Gemini Enterprise Agent Platform | גמיש יותר וניתן להתאמה אישית, אבל נדרש ידע מעמיק יותר ב-Ray |
| עלות | העלות תלויה בסוגי המכונות, במאיצים ובמספר העותקים. | העלות תלויה בבחירות שלכם לגבי התשתית |
| תכונות מיוחדות | מעקב אחר מודלים, בדיקות A/B, פיצול תנועה, Gemini Enterprise Agent Platform מרשם המודלים ושילוב של Gemini Enterprise Agent Platform Pipelines | הרכבה מתקדמת של מודלים, מודלים משולבים, לוגיקת הסקה בהתאמה אישית, שילוב עם המערכת האקולוגית של Ray |
ייבוא והפעלה של לקוח Ray ב-Agent Platform
אם כבר התחברתם לאשכול Ray ב-Gemini Enterprise Agent Platform, צריך להפעיל מחדש את ליבת המערכת ולהריץ את הקוד הבא. המשתנה runtime_env נדרש בזמן החיבור כדי להריץ פקודות של הסקה אונליין.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Agent Platform to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
כאשר:
CLUSTER_RESOURCE_NAME: שם המשאב המלא של אשכול Ray ב-Agent Platform, שצריך להיות ייחודי בפרויקט.
BUCKET_URI היא הקטגוריה של Cloud Storage שבה יאוחסנו ארטיפקטים של המודל.
אימון המודל וייצוא שלו למרשם המודלים
מייצאים את המודל של Gemini Enterprise Agent Platform מנקודת הבדיקה של Ray ומעלים את המודל אל מרשם המודלים.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.47.1", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Agent Platform client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Agent Platform client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define an XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
ממירים את נקודות הבדיקה של Ray למודל.
גרסת Build
model.mar.יוצרים LocalModel באמצעות
model.mar.העלאה למרשם המודלים.
פריסת המודל להסקת מסקנות אונליין
פורסים את המודל בנקודת הקצה אונליין. מידע נוסף זמין במאמר בנושא פריסת המודל לנקודת קצה.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
כאשר:
(אופציונלי) DEPLOYED_NAME: השם לתצוגה של המודל שנפרס. אם לא מציינים שם כשיוצרים את המודל, המערכת משתמשת ב
display_nameשל המודל.(Optional) TRAFFIC_SPLIT: מיפוי ממזהה של מודל שנפרס לאחוז התנועה בנקודת הקצה הזו שצריך להעביר למודל שנפרס. אם המזהה של מודל שפרסתם לא מופיע במפה הזו, המודל לא מקבל תעבורה. סכום הערכים של אחוז התנועה חייב להיות 100, או שהמיפוי צריך להיות ריק אם נקודת הקצה לא מקבלת תנועה כרגע. המפתח של המודל שמוטמע הוא
"0". לדוגמה,{"0": 100}.(אופציונלי) MACHINE_TYPE: מציינים את משאבי המחשוב.
שליחת בקשת הסקה
שליחת בקשת הסקה לנקודת הקצה. מידע נוסף מופיע במאמר בנושא קבלת מסקנות אונליין ממודל שאומן בהתאמה אישית.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
הפלט אמור להיראות כך:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)