Wenn Sie eine Ray-Anwendung auf der Gemini Enterprise Agent Platform ausführen, verwenden Sie BigQuery als Cloud-Datenbank. In diesem Abschnitt wird beschrieben, wie Sie über Ihren Ray-Cluster auf der Gemini Enterprise Agent Platform aus einer BigQuery-Datenbank lesen und in diese schreiben. Bei den Schritten in diesem Abschnitt wird davon ausgegangen, dass Sie das Agent Platform SDK für Python verwenden.
Wenn Sie aus einem BigQuery-Dataset lesen möchten, erstellen Sie ein neues BigQuery-Dataset oder verwenden Sie ein vorhandenes Dataset.
Ray on Agent Platform-Client importieren und initialisieren
Wenn Sie mit Ihrem Ray-Cluster auf der Gemini Enterprise Agent Platform verbunden sind, starten Sie den Kernel neu und führen Sie den folgenden Code aus. Die Variable runtime_env ist beim Herstellen der Verbindung erforderlich, um BigQuery-Befehle auszuführen.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Daten aus BigQuery lesen
Lesen Sie Daten aus Ihrem BigQuery-Dataset. Der Lesevorgang muss von einer Ray-Aufgabe ausgeführt werden.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Wobei:
PROJECT_ID: Google Cloud Projekt-ID. Die Projekt-ID finden Sie auf der Google Cloud Console Begrüßungs seite.
LOCATION: Der Speicherort des
Dataset. Beispiel:us-central1.DATASET: BigQuery-Dataset. Es muss das Format
dataset.tablehaben. Legen Sie es aufNonefest, wenn Sie eine Abfrage angeben.PARALLELISM: Eine Ganzzahl, die beeinflusst, wie viele Lesevorgänge parallel erstellt werden. Es werden möglicherweise weniger Lesestreams erstellt, als Sie anfordern.
QUERY: Ein String mit einer SQL-Abfrage zum Lesen aus der BigQuery-Datenbank. Legen Sie ihn auf
Nonefest, wenn keine Abfrage erforderlich ist.
Daten transformieren
Aktualisieren und löschen Sie Zeilen und Spalten aus Ihren BigQuery-Tabellen mit pyarrow oder pandas. Wenn Sie pandas-Transformationen verwenden möchten, behalten Sie den Eingabetyp als „pyarrow“ bei und konvertieren Sie ihn innerhalb der benutzerdefinierten Funktion (User-Defined Function, UDF) in pandas, damit Sie pandas-Konvertierungstypfehler innerhalb der UDF erkennen können. Die Transformation muss von einer
Ray-Aufgabe ausgeführt werden.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Daten in BigQuery schreiben
Fügen Sie Daten in Ihr BigQuery-Dataset ein. Der Schreibvorgang muss von einer Ray-Aufgabe ausgeführt werden.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Wobei:
- DATASET: BigQuery-Dataset. Das Dataset muss das Format
dataset.tablehaben.
Nächste Schritte
Modell auf der Gemini Enterprise Agent Platform bereitstellen und Vorhersagen abrufen
Logs für Ihren Ray-Cluster auf der Gemini Enterprise Agent Platform ansehen