在 Gemini Enterprise Agent Platform 部署模型,取得推論結果

在 Gemini Enterprise Agent Platform 的 Ray 叢集上訓練模型後,請按照下列程序部署模型,以處理線上推論要求:

開始之前,請務必閱讀 Ray on Agent Platform 總覽,並設定所有必要工具。

本節中的步驟假設您在互動式 Python 環境中使用 Agent Platform SDK 上的 Ray。

Gemini Enterprise Agent Platform 線上推論與 Ray 推論比較

功能 Gemini Enterprise Agent Platform 線上推論 (建議) Ray 推論 (Ray Serve)
擴充性 依據流量自動調度資源 (即使是 LLM 模型也能高度擴充) 可透過分散式後端和自訂資源管理功能大幅擴充
Infrastructure Management 由 Google Cloud全代管,可減少作業負擔 需要在基礎架構或 Kubernetes 叢集上進行更多手動設定和管理
API/支援的功能 REST 和 gRPC API、線上和批次推論、可解釋性功能、批次處理、快取、串流 REST 和 gRPC API、即時和批次推論、模型組合、批次處理、快取、串流
模型格式 支援 TensorFlow、PyTorch、scikit-learn、XGBoost 等各種架構,可使用預先建構的容器或任何自訂容器 支援 TensorFlow、PyTorch、scikit-learn 等各種架構。
易於使用 設定和管理更輕鬆,並與其他 Gemini Enterprise Agent Platform 功能整合 更具彈性且可自訂,但需要深入瞭解 Ray
費用 費用取決於機器類型、加速器和副本數量 費用取決於基礎架構選擇
專業功能 模型監控、A/B 測試、流量分割、Gemini Enterprise Agent Platform Model Registry 和 Gemini Enterprise Agent Platform Pipelines 整合 進階模型組合、模型集合、自訂推論邏輯、與 Ray 生態系統整合

匯入及初始化 Ray on Agent Platform 用戶端

如果您已連線至 Gemini Enterprise Agent Platform 上的 Ray 叢集,請重新啟動核心並執行下列程式碼。連線時必須使用 runtime_env 變數,才能執行線上推論指令。

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Agent Platform to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

其中:

  • CLUSTER_RESOURCE_NAME:Agent Platform 叢集上的 Ray 完整資源名稱,在專案中不得重複。

  • BUCKET_URI 是用來儲存模型構件的 Cloud Storage bucket。

訓練模型並匯出至 Model Registry

從 Ray 檢查點匯出 Gemini Enterprise Agent Platform 模型,然後將模型上傳至 Model Registry。

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.47.1", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define an XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • 將 Ray 檢查點轉換為模型。

  • 建構 model.mar

  • 使用 model.mar 建立 LocalModel。

  • 上傳至 Model Registry。

部署模型以進行線上推論

將模型部署至線上端點。詳情請參閱「將模型部署至端點」一文。

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

其中:

  • (選用) DEPLOYED_NAME:已部署模型的顯示名稱。如果您在建立模型時未提供名稱,系統會使用模型的display_name

  • (選用) TRAFFIC_SPLIT:從已部署模型的 ID 到應轉送至該已部署模型的端點流量百分比的對應。如果已部署模型的 ID 未列在這個對應表中,則該模型不會收到任何流量。流量百分比值加總必須為 100,如果端點目前不接受任何流量,地圖必須為空白。部署模型的金鑰為 "0"。例如:{"0": 100}

  • (選用) MACHINE_TYPE指定運算資源

提出推論要求

將推論要求傳送至端點。詳情請參閱「透過自訂訓練模型取得線上推論結果」。

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

輸出結果應該會類似下列內容:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

後續步驟