Implementa un modelo en Gemini Enterprise Agent Platform para obtener inferencias

Después de entrenar un modelo en un clúster de Ray en Gemini Enterprise Agent Platform, implementa el modelo para las solicitudes de inferencia en línea con el siguiente proceso:

Antes de comenzar, asegúrate de leer la descripción general de Ray en Agent Platform y configurar todas las herramientas de requisitos previos que necesitas.

En los pasos de esta sección, se supone que usas el SDK de Ray en Agent Platform en un entorno interactivo de Python.

Comparación entre la inferencia en línea de Gemini Enterprise Agent Platform y la inferencia de Ray

Función Inferencia en línea de Gemini Enterprise Agent Platform (recomendada) Inferencia de Ray (Ray Serve)
Escalabilidad Ajuste de escala automático basado en el tráfico (altamente escalable incluso para modelos de LLM) Altamente escalable con back-ends distribuidos y administración de recursos personalizada
Administración de la infraestructura Completamente administrado por Google Cloud, menor sobrecarga operativa Requiere más configuración y administración manuales en tu infraestructura o clúster de Kubernetes
API/Funciones compatibles APIs de REST y gRPC, inferencias en línea y por lotes, funciones de explicabilidad, procesamiento por lotes, almacenamiento en caché y transmisión APIs de REST y gRPC, inferencia en tiempo real y por lotes, composición de modelos, procesamiento por lotes, almacenamiento en caché, transmisión
Formato del modelo Admite varios frameworks, como TensorFlow, PyTorch, scikit-learn y XGBoost, con contenedores compilados previamente o cualquier contenedor personalizado Admite varios frameworks, como TensorFlow, PyTorch y scikit-learn.
Facilidad de uso Es más fácil de configurar y administrar, y se integra con otras funciones de Gemini Enterprise Agent Platform. Más flexible y personalizable, pero requiere un conocimiento más profundo de Ray
Costo El costo depende de los tipos de máquinas, los aceleradores y la cantidad de réplicas. El costo depende de las opciones de infraestructura que elijas
Funciones especializadas Supervisión de modelos, pruebas A/B, división del tráfico, integración de Model Registry y Pipelines de Gemini Enterprise Agent Platform Composición de modelos avanzada, modelos de conjunto, lógica de inferencia personalizada, integración con el ecosistema de Ray

Importa e inicializa el cliente de Ray on Agent Platform

Si ya estás conectado a tu clúster de Ray en Gemini Enterprise Agent Platform, reinicia tu kernel y ejecuta el siguiente código. La variable runtime_env es necesaria en el momento de la conexión para ejecutar comandos de inferencia en línea.

import ray
import vertexai

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

# Initialize Agent Platform to retrieve projects for downstream operations.
vertexai.init(staging_bucket=BUCKET_URI)

# Shutdown cluster and reconnect with required dependencies in the runtime_env.
ray.shutdown()

Aquí:

  • CLUSTER_RESOURCE_NAME: Es el nombre completo del recurso para el clúster de Ray en Agent Platform que debe ser único en todo el proyecto.

  • BUCKET_URI es el bucket de Cloud Storage para almacenar los artefactos del modelo.

Entrena y exporta el modelo a Model Registry

Exporta el modelo de Gemini Enterprise Agent Platform desde el punto de control de Ray y súbelo a Model Registry.

TensorFlow

import numpy as np
from ray.air import session, CheckpointConfig, ScalingConfig
from ray.air.config import RunConfig
from ray.train import SyncConfig
from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer
from ray import train
import tensorflow as tf

from vertex_ray.predict import tensorflow

# Required dependencies at runtime
runtime_env = {
  "pip": [
      "ray==2.47.1", # pin the Ray version to prevent it from being overwritten
      "tensorflow",
      "IPython",
      "numpy",
  ],
}

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define a TensorFlow model.

def create_model():
  model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))])
  model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"])
  return model

def train_func(config):
  n = 100
  # Create a fake dataset
  # data   : X - dim = (n, 4)
  # target : Y - dim = (n, 1)
  X = np.random.normal(0, 1, size=(n, 4))
  Y = np.random.uniform(0, 1, size=(n, 1))

  strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
  with strategy.scope():
      model = create_model()
      print(model)

  for epoch in range(config["num_epochs"]):
      model.fit(X, Y, batch_size=20)
      tf.saved_model.save(model, "temp/my_model")
      checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model")
      train.report({}, checkpoint=checkpoint)

trainer = TensorflowTrainer(
  train_func,
  train_loop_config={"num_epochs": 5},
  scaling_config=ScalingConfig(num_workers=1),
  run_config=RunConfig(
      storage_path=f'{BUCKET_URI}/ray_results/tensorflow',
      checkpoint_config=CheckpointConfig(
          num_to_keep=1  # Keep all checkpoints.
      ),
      sync_config=SyncConfig(
          sync_artifacts=True,
      ),
  ),
)

# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = tensorflow.register_tensorflow(
  result.checkpoint,
)

sklearn

from vertex_ray.predict import sklearn
from ray.train.sklearn import SklearnCheckpoint

vertex_model = sklearn.register_sklearn(
  result.checkpoint,
)

XGBoost

from vertex_ray.predict import xgboost
from ray.train.xgboost import XGBoostTrainer

# Initialize  Ray on Agent Platform client for remote cluster connection
ray.init(address=address, runtime_env=runtime_env)

# Define an XGBoost model.
train_dataset = ray.data.from_pandas(
pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)]))

run_config = RunConfig(
storage_path=f'{BUCKET_URI}/ray_results/xgboost',
checkpoint_config=CheckpointConfig(
    num_to_keep=1  # Keep all checkpoints.
),
sync_config=SyncConfig(sync_artifacts=True),
)

trainer = XGBoostTrainer(
label_column="y",
params={"objective": "reg:squarederror"},
scaling_config=ScalingConfig(num_workers=3),
datasets={"train": train_dataset},
run_config=run_config,
)
# Train the model.
result = trainer.fit()

# Register the trained model to Model Registry.
vertex_model = xgboost.register_xgboost(
result.checkpoint,
)

PyTorch

  • Convierte los puntos de control de Ray en un modelo.

  • Compila model.mar.

  • Crea LocalModel con model.mar.

  • Sube el modelo a Model Registry.

Implementa el modelo para las inferencias en línea

Implementa el modelo en el extremo en línea: Para obtener más información, consulta Implementa el modelo en un extremo.

DEPLOYED_NAME = model.display_name + "-endpoint"
TRAFFIC_SPLIT = {"0": 100}
MACHINE_TYPE = "n1-standard-4"

endpoint = vertex_model.deploy(
    deployed_model_display_name=DEPLOYED_NAME,
    traffic_split=TRAFFIC_SPLIT,
    machine_type=MACHINE_TYPE,
)

Donde:

  • DEPLOYED_NAME: el nombre visible del modelo implementado (opcional). Si no proporcionas un nombre en el momento de la creación, el sistema usa la display_name del modelo.

  • TRAFFIC_SPLIT: un mapa del ID de un modelo implementado al porcentaje de tráfico de este extremo que se debe reenviar a ese modelo implementado (opcional). Si el ID de un modelo implementado no aparece en este mapa, no recibe tráfico. Los valores de porcentaje de tráfico deben sumar hasta 100, o el mapa debe estar vacío si el extremo no acepta tráfico en este momento. La clave del modelo que se está implementando es "0". Por ejemplo, {"0": 100}.

  • MACHINE_TYPE: especifica los recursos de procesamiento (opcional).

Realiza una solicitud de inferencia

Envía una solicitud de inferencia al extremo. Para obtener más información, consulta Cómo obtener inferencias en línea a partir de un modelo entrenado personalizado.

pred_request = [
    [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471],
    [-0.81881499, 0.43874669, -0.25108584, 1.75536031]
]

endpoint.predict(pred_request)

Deberías obtener un resultado como el siguiente:

Prediction(predictions=[0.7891440987586975, 0.5843208432197571],
 deployed_model_id='3829557218101952512',
 model_version_id='1',
 model_resource_name='projects/123456789/locations/us-central1/models/123456789101112',
 explanations=None)

¿Qué sigue?