在 Gemini Enterprise Agent Platform 的 Ray 叢集上訓練模型後,請按照下列程序部署模型,以處理線上推論要求:
從 Ray 檢查點匯出模型。
將模型上傳至 Model Registry。
將模型部署至端點。
發出推論要求。
本節中的步驟假設您在互動式 Python 環境中使用 Agent Platform SDK 上的 Ray。
Gemini Enterprise Agent Platform 線上推論與 Ray 推論比較
| 功能 | Gemini Enterprise Agent Platform 線上推論 (建議) | Ray 推論 (Ray Serve) |
|---|---|---|
| 擴充性 | 依據流量自動調度資源 (即使是 LLM 模型也能高度擴充) | 可透過分散式後端和自訂資源管理功能大幅擴充 |
| Infrastructure Management | 由 Google Cloud全代管,可減少作業負擔 | 需要在基礎架構或 Kubernetes 叢集上進行更多手動設定和管理 |
| API/支援的功能 | REST 和 gRPC API、線上和批次推論、可解釋性功能、批次處理、快取、串流 | REST 和 gRPC API、即時和批次推論、模型組合、批次處理、快取、串流 |
| 模型格式 | 支援 TensorFlow、PyTorch、scikit-learn、XGBoost 等各種架構,可使用預先建構的容器或任何自訂容器 | 支援 TensorFlow、PyTorch、scikit-learn 等各種架構。 |
| 易於使用 | 設定和管理更輕鬆,並與其他 Gemini Enterprise Agent Platform 功能整合 | 更具彈性且可自訂,但需要深入瞭解 Ray |
| 費用 | 費用取決於機器類型、加速器和副本數量 | 費用取決於基礎架構選擇 |
| 專業功能 | 模型監控、A/B 測試、流量分割、Gemini Enterprise Agent Platform Model Registry 和 Gemini Enterprise Agent Platform Pipelines 整合 | 進階模型組合、模型集合、自訂推論邏輯、與 Ray 生態系統整合 |
匯入及初始化 Ray on Agent Platform 用戶端
如果您已連線至 Gemini Enterprise Agent Platform 上的 Ray 叢集,請重新啟動核心並執行下列程式碼。連線時必須使用 runtime_env 變數,才能執行線上推論指令。
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Agent Platform to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
其中:
CLUSTER_RESOURCE_NAME:Agent Platform 叢集上的 Ray 完整資源名稱,在專案中不得重複。
BUCKET_URI 是用來儲存模型構件的 Cloud Storage bucket。
訓練模型並匯出至 Model Registry
從 Ray 檢查點匯出 Gemini Enterprise Agent Platform 模型,然後將模型上傳至 Model Registry。
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.47.1", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Agent Platform client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Agent Platform client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define an XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
將 Ray 檢查點轉換為模型。
建構
model.mar。使用
model.mar建立 LocalModel。上傳至 Model Registry。
部署模型以進行線上推論
將模型部署至線上端點。詳情請參閱「將模型部署至端點」一文。
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
其中:
(選用) DEPLOYED_NAME:已部署模型的顯示名稱。如果您在建立模型時未提供名稱,系統會使用模型的
display_name。(選用) TRAFFIC_SPLIT:從已部署模型的 ID 到應轉送至該已部署模型的端點流量百分比的對應。如果已部署模型的 ID 未列在這個對應表中,則該模型不會收到任何流量。流量百分比值加總必須為 100,如果端點目前不接受任何流量,地圖必須為空白。部署模型的金鑰為
"0"。例如:{"0": 100}。(選用) MACHINE_TYPE:指定運算資源。
提出推論要求
將推論要求傳送至端點。詳情請參閱「透過自訂訓練模型取得線上推論結果」。
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
輸出結果應該會類似下列內容:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)