Ray in Agent Platform mit BigQuery verwenden

Wenn Sie eine Ray-Anwendung auf der Gemini Enterprise Agent Platform ausführen, verwenden Sie BigQuery als Cloud-Datenbank. In diesem Abschnitt wird beschrieben, wie Sie über Ihren Ray-Cluster auf der Gemini Enterprise Agent Platform aus einer BigQuery-Datenbank lesen und in diese schreiben. Bei den Schritten in diesem Abschnitt wird davon ausgegangen, dass Sie das Agent Platform SDK für Python verwenden.

Wenn Sie aus einem BigQuery-Dataset lesen möchten, erstellen Sie ein neues BigQuery-Dataset oder verwenden Sie ein vorhandenes Dataset.

Ray on Agent Platform-Client importieren und initialisieren

Wenn Sie mit Ihrem Ray-Cluster auf der Gemini Enterprise Agent Platform verbunden sind, starten Sie den Kernel neu und führen Sie den folgenden Code aus. Die Variable runtime_env ist beim Herstellen der Verbindung erforderlich, um BigQuery-Befehle auszuführen.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Daten aus BigQuery lesen

Lesen Sie Daten aus Ihrem BigQuery-Dataset. Der Lesevorgang muss von einer Ray-Aufgabe ausgeführt werden.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Wobei:

  • PROJECT_ID: Google Cloud Projekt-ID. Die Projekt-ID finden Sie auf der Google Cloud Console Begrüßungs seite.

  • LOCATION: Der Speicherort des Dataset. Beispiel: us-central1.

  • DATASET: BigQuery-Dataset. Es muss das Format dataset.table haben. Legen Sie es auf None fest, wenn Sie eine Abfrage angeben.

  • PARALLELISM: Eine Ganzzahl, die beeinflusst, wie viele Lesevorgänge parallel erstellt werden. Es werden möglicherweise weniger Lesestreams erstellt, als Sie anfordern.

  • QUERY: Ein String mit einer SQL-Abfrage zum Lesen aus der BigQuery-Datenbank. Legen Sie ihn auf None fest, wenn keine Abfrage erforderlich ist.

Daten transformieren

Aktualisieren und löschen Sie Zeilen und Spalten aus Ihren BigQuery-Tabellen mit pyarrow oder pandas. Wenn Sie pandas-Transformationen verwenden möchten, behalten Sie den Eingabetyp als „pyarrow“ bei und konvertieren Sie ihn innerhalb der benutzerdefinierten Funktion (User-Defined Function, UDF) in pandas, damit Sie pandas-Konvertierungstypfehler innerhalb der UDF erkennen können. Die Transformation muss von einer Ray-Aufgabe ausgeführt werden.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Daten in BigQuery schreiben

Fügen Sie Daten in Ihr BigQuery-Dataset ein. Der Schreibvorgang muss von einer Ray-Aufgabe ausgeführt werden.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Wobei:

  • DATASET: BigQuery-Dataset. Das Dataset muss das Format dataset.table haben.

Nächste Schritte