פריסת מודל ב-Gemini Enterprise Agent Platform כדי לקבל מסקנות

אחרי שמבצעים אימון של מודל באשכול Ray ב-Gemini Enterprise Agent Platform, אפשר לפרוס את המודל כדי לשלוח בקשות הסקה אונליין באמצעות התהליך הבא:

לפני שמתחילים, חשוב לקרוא את סקירת Ray ב-Agent Platform ולהגדיר את כל הכלים הנדרשים.

השלבים בקטע הזה מבוססים על ההנחה שאתם משתמשים ב-Ray on Agent Platform SDK בסביבת Python אינטראקטיבית.

השוואה בין הסקת מסקנות אונליין ב-Gemini Enterprise Agent Platform לבין הסקת מסקנות ב-Ray

תכונה הסקת מסקנות אונליין ב-Gemini Enterprise Agent Platform (מומלץ) הסקת מסקנות באמצעות Ray‏ (Ray Serve)
מדרגיות התאמה אוטומטית לעומס על סמך תעבורה (ניתן להרחבה רבה גם למודלים של LLM) גמישות גבוהה עם קצה עורפי מבוזר וניהול משאבים בהתאמה אישית
ניהול התשתיות מנוהל באופן מלא על ידי Google Cloud, תקורה תפעולית נמוכה יותר נדרשת הגדרה וניהול ידניים נוספים בתשתית או באשכול Kubernetes
API/תכונות נתמכות ממשקי API ל-REST ול-gRPC, היקשים אונליין ובאצווה, תכונות של יכולת הסברה, עיבוד נתונים באצווה, שמירה במטמון, סטרימינג ממשקי API של REST ו-gRPC, הסקת מסקנות בזמן אמת ובאצווה, הרכבת מודלים, אצווה, שמירה במטמון, סטרימינג
פורמט המודל תמיכה במסגרות שונות כמו TensorFlow, ‏ PyTorch, ‏ scikit-learn, ‏ XGBoost באמצעות קונטיינרים מוכנים מראש או קונטיינרים בהתאמה אישית תמיכה במסגרות שונות כמו TensorFlow, ‏ PyTorch, ‏ scikit-learn.
קלות השימוש קל יותר להגדיר ולנהל, משולב עם תכונות אחרות של Gemini Enterprise Agent Platform גמיש יותר וניתן להתאמה אישית, אבל נדרש ידע מעמיק יותר ב-Ray
עלות העלות תלויה בסוגי המכונות, במאיצים ובמספר העותקים. העלות תלויה בבחירות שלכם לגבי התשתית
תכונות מיוחדות מעקב אחר מודלים, בדיקות A/B, פיצול תנועה, Gemini Enterprise Agent Platform מרשם המודלים ושילוב של Gemini Enterprise Agent Platform Pipelines הרכבה מתקדמת של מודלים, מודלים משולבים, לוגיקת הסקה בהתאמה אישית, שילוב עם המערכת האקולוגית של Ray

ייבוא והפעלה של לקוח Ray ב-Agent Platform

אם כבר התחברתם לאשכול Ray ב-Gemini Enterprise Agent Platform, צריך להפעיל מחדש את ליבת המערכת ולהריץ את הקוד הבא. המשתנה runtime_env נדרש בזמן החיבור כדי להריץ פקודות של הסקה אונליין.

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Agent Platform to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

כאשר:

  • CLUSTER_RESOURCE_NAME: שם המשאב המלא של אשכול Ray ב-Agent Platform, שצריך להיות ייחודי בפרויקט.

  • BUCKET_URI היא הקטגוריה של Cloud Storage שבה יאוחסנו ארטיפקטים של המודל.

אימון המודל וייצוא שלו למרשם המודלים

מייצאים את המודל של Gemini Enterprise Agent Platform מנקודת הבדיקה של Ray ומעלים את המודל אל מרשם המודלים.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.47.1", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define an XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • ממירים את נקודות הבדיקה של Ray למודל.

  • גרסת Build‏ model.mar.

  • יוצרים LocalModel באמצעות model.mar.

  • העלאה למרשם המודלים.

פריסת המודל להסקת מסקנות אונליין

פורסים את המודל בנקודת הקצה אונליין. מידע נוסף זמין במאמר בנושא פריסת המודל לנקודת קצה.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

כאשר:

  • (אופציונלי) DEPLOYED_NAME: השם לתצוגה של המודל שנפרס. אם לא מציינים שם כשיוצרים את המודל, המערכת משתמשת בdisplay_name של המודל.

  • ‫(Optional) TRAFFIC_SPLIT: מיפוי ממזהה של מודל שנפרס לאחוז התנועה בנקודת הקצה הזו שצריך להעביר למודל שנפרס. אם המזהה של מודל שפרסתם לא מופיע במפה הזו, המודל לא מקבל תעבורה. סכום הערכים של אחוז התנועה חייב להיות 100, או שהמיפוי צריך להיות ריק אם נקודת הקצה לא מקבלת תנועה כרגע. המפתח של המודל שמוטמע הוא "0". לדוגמה, {"0": 100}.

  • (אופציונלי) MACHINE_TYPE: מציינים את משאבי המחשוב.

שליחת בקשת הסקה

שליחת בקשת הסקה לנקודת הקצה. מידע נוסף מופיע במאמר בנושא קבלת מסקנות אונליין ממודל שאומן בהתאמה אישית.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

הפלט אמור להיראות כך:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

המאמרים הבאים