Nachdem Sie ein Modell in einem Ray-Cluster in Vertex AI trainiert haben, stellen Sie es mit dem folgenden Verfahren für Onlineinferenzanfragen bereit:
Exportieren Sie das Modell aus dem Ray-Prüfpunkt.
Laden Sie das Modell in die Vertex AI Model Registry hoch.
Modell auf einem Endpunkt bereitstellen
Stellen Sie Inferenzanfragen.
Bei den Schritten in diesem Abschnitt wird davon ausgegangen, dass Sie das Ray on Vertex AI SDK in einer interaktiven Python-Umgebung verwenden.
Onlineinferenz in Vertex AI und Ray-Inferenz im Vergleich
| Funktion | Onlineinferenz in Vertex AI (empfohlen) | Ray-Inferenz (Ray Serve) |
|---|---|---|
| Skalierbarkeit | Autoscaling basierend auf Traffic (auch für LLM-Modelle hoch skalierbar) | Hoch skalierbar mit verteilten Back-Ends und benutzerdefinierter Ressourcenverwaltung |
| Infrastrukturverwaltung | Vollständig verwaltet von Google Cloud, weniger Betriebsaufwand | Erfordert mehr manuelle Einrichtung und Verwaltung in Ihrer Infrastruktur oder Ihrem Kubernetes-Cluster |
| API/Unterstützte Funktionen | REST- und gRPC-APIs, Online- und Batchinferenzen, Erklärbarkeitsfunktionen, Batching, Caching, Streaming | REST und gRPC APIs, Echtzeit- und Batchinferenz, Modellzusammensetzung, Batching, Caching, Streaming |
| Modellformat | Unterstützt verschiedene Frameworks wie TensorFlow, PyTorch, scikit-learn und XGBoost mit vordefinierten Containern oder einem beliebigen benutzerdefinierten Container | Unterstützt verschiedene Frameworks wie TensorFlow, PyTorch und scikit-learn. |
| Nutzerfreundlichkeit | Einfacher einzurichten und zu verwalten, in andere Vertex AI-Funktionen eingebunden | Flexibler und anpassbarer, erfordert aber fundierte Kenntnisse von Ray |
| Kosten | Die Kosten hängen von den Maschinentypen, Beschleunigern und der Anzahl der Replikate ab. | Die Kosten hängen von Ihren Infrastrukturentscheidungen ab. |
| Spezielle Funktionen | Modellmonitoring, A/B-Tests, Traffic-Aufteilung, Vertex AI Model Registry- und Vertex AI Pipelines-Integration | Erweiterte Modellzusammensetzung, Ensemble-Modelle, benutzerdefinierte Inferenzlogik, Integration in das Ray-Ökosystem |
Ray on Vertex AI-Client importieren und initialisieren
Wenn Sie bereits mit Ihrem Ray on Vertex AI-Cluster verbunden sind, starten Sie den Kernel neu und führen Sie den folgenden Code aus. Die Variable runtime_env ist zum Zeitpunkt der Verbindung erforderlich, um Onlineinferenzbefehle auszuführen.
import ray import vertexai # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) # Initialize Vertex AI to retrieve projects for downstream operations. vertexai.init(staging_bucket=BUCKET_URI) # Shutdown cluster and reconnect with required dependencies in the runtime_env. ray.shutdown()
Wobei:
CLUSTER_RESOURCE_NAME: Der vollständige Ressourcenname für den Ray on Vertex AI Cluster, der in Ihrem Projekt eindeutig sein muss.
BUCKET_URI ist der Cloud Storage-Bucket zum Speichern der Modell artefakte.
Modell trainieren und in die Vertex AI Model Registry exportieren
Exportieren Sie das Vertex AI-Modell aus dem Ray-Prüfpunkt und laden Sie das Modell in die Vertex AI Model Registry hoch.
TensorFlow
import numpy as np from ray.air import session, CheckpointConfig, ScalingConfig from ray.air.config import RunConfig from ray.train import SyncConfig from ray.train.tensorflow import TensorflowCheckpoint, TensorflowTrainer from ray import train import tensorflow as tf from vertex_ray.predict import tensorflow # Required dependencies at runtime runtime_env = { "pip": [ "ray==2.47.1", # pin the Ray version to prevent it from being overwritten "tensorflow", "IPython", "numpy", ], } # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define a TensorFlow model. def create_model(): model = tf.keras.Sequential([tf.keras.layers.Dense(1, activation="linear", input_shape=(4,))]) model.compile(optimizer="Adam", loss="mean_squared_error", metrics=["mse"]) return model def train_func(config): n = 100 # Create a fake dataset # data : X - dim = (n, 4) # target : Y - dim = (n, 1) X = np.random.normal(0, 1, size=(n, 4)) Y = np.random.uniform(0, 1, size=(n, 1)) strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy() with strategy.scope(): model = create_model() print(model) for epoch in range(config["num_epochs"]): model.fit(X, Y, batch_size=20) tf.saved_model.save(model, "temp/my_model") checkpoint = TensorflowCheckpoint.from_saved_model("temp/my_model") train.report({}, checkpoint=checkpoint) trainer = TensorflowTrainer( train_func, train_loop_config={"num_epochs": 5}, scaling_config=ScalingConfig(num_workers=1), run_config=RunConfig( storage_path=f'{BUCKET_URI}/ray_results/tensorflow', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig( sync_artifacts=True, ), ), ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = tensorflow.register_tensorflow( result.checkpoint, )
sklearn
from vertex_ray.predict import sklearn from ray.train.sklearn import SklearnCheckpoint vertex_model = sklearn.register_sklearn( result.checkpoint, )
XGBoost
from vertex_ray.predict import xgboost from ray.train.xgboost import XGBoostTrainer # Initialize Ray on Vertex AI client for remote cluster connection ray.init(address=address, runtime_env=runtime_env) # Define an XGBoost model. train_dataset = ray.data.from_pandas( pd.DataFrame([{"x": x, "y": x + 1} for x in range(32)])) run_config = RunConfig( storage_path=f'{BUCKET_URI}/ray_results/xgboost', checkpoint_config=CheckpointConfig( num_to_keep=1 # Keep all checkpoints. ), sync_config=SyncConfig(sync_artifacts=True), ) trainer = XGBoostTrainer( label_column="y", params={"objective": "reg:squarederror"}, scaling_config=ScalingConfig(num_workers=3), datasets={"train": train_dataset}, run_config=run_config, ) # Train the model. result = trainer.fit() # Register the trained model to Vertex AI Model Registry. vertex_model = xgboost.register_xgboost( result.checkpoint, )
PyTorch
Konvertieren Sie Ray-Prüfpunkte in ein Modell.
Erstellen Sie
model.mar.Erstellen Sie LocalModel mit
model.mar.Laden Sie in Vertex AI Model Registry hoch.
Modell für Onlineinferenzen bereitstellen
Stellen Sie das Modell für den Online-Endpunkt bereit. Weitere Informationen finden Sie unter Modell auf einem Endpunkt bereitstellen.
DEPLOYED_NAME = model.display_name + "-endpoint" TRAFFIC_SPLIT = {"0": 100} MACHINE_TYPE = "n1-standard-4" endpoint = vertex_model.deploy( deployed_model_display_name=DEPLOYED_NAME, traffic_split=TRAFFIC_SPLIT, machine_type=MACHINE_TYPE, )
Dabei gilt:
Optional: DEPLOYED_NAME: Der Anzeigename des bereitgestellten Modells. Wenn Sie bei der Erstellung keinen Namen angeben, verwendet das System den
display_namedes Modells.Optional: TRAFFIC_SPLIT: Eine Zuordnung von der ID eines bereitgestellten Modells zum Prozentsatz des Traffics dieses Endpunkts, der an dieses bereitgestellte Modell weitergeleitet werden soll. Wenn die ID eines bereitgestellten Modells nicht in dieser Zuordnung aufgeführt ist, erhält es keinen Traffic. Die Werte für den Traffic-Prozentsatz müssen zusammen 100 ergeben. Alternativ kann die Zuordnung leer sein, wenn der Endpunkt derzeit keinen Traffic akzeptiert. Der Schlüssel für das bereitzustellende Modell ist
"0". Beispiel:{"0": 100}Optional: MACHINE_TYPE: Geben Sie die Compute-Ressourcen an.
Inferenzanfrage stellen
Senden Sie eine Inferenzanfrage an den Endpunkt. Weitere Informationen finden Sie unter Onlineinferenzen von einem benutzerdefinierten trainierten Modell abrufen.
pred_request = [ [ 1.7076793 , 0.23412449, 0.95170785, -0.10901471], [-0.81881499, 0.43874669, -0.25108584, 1.75536031] ] endpoint.predict(pred_request)
Die Ausgabe sollte in etwa so aussehen:
Prediction(predictions=[0.7891440987586975, 0.5843208432197571], deployed_model_id='3829557218101952512', model_version_id='1', model_resource_name='projects/123456789/locations/us-central1/models/123456789101112', explanations=None)