Utilizzare Ray on Agent Platform con BigQuery

Quando esegui un'applicazione Ray sulla piattaforma agentica Gemini Enterprise, utilizza BigQuery come database cloud. Questa sezione spiega come leggere e scrivere in un database BigQuery da il cluster Ray sulla piattaforma agentica Gemini Enterprise. I passaggi descritti in questa sezione presuppongono l'utilizzo dell'SDK Agent Platform Python.

Per leggere da un set di dati BigQuery, crea un nuovo set di dati BigQuery o utilizzane uno esistente.

Importare e inizializzare Ray sul client della piattaforma agentica

Se hai eseguito la connessione al cluster Ray sulla piattaforma agentica Gemini Enterprise, riavvia il kernel ed esegui il seguente codice. La variabile runtime_env è necessaria al momento della connessione per eseguire i comandi BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Leggere i dati da BigQuery

Leggi i dati dal set di dati BigQuery. Un'attività Ray deve eseguire l'operazione di lettura.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Dove:

  • PROJECT_ID: Google Cloud l'ID progetto. Trova l'ID progetto nella Google Cloud console di benvenuto pagina.

  • LOCATION: la località in cui è archiviato il Dataset. Ad esempio, us-central1.

  • DATASET: il set di dati BigQuery. Deve essere nel formato dataset.table. Imposta su None se fornisci una query.

  • PARALLELISM: un numero intero che influenza il numero di attività di lettura create in parallelo. Il numero di stream di lettura creati potrebbe essere inferiore a quello richiesto.

  • QUERY: una stringa contenente una query SQL per leggere dal database BigQuery. Imposta su None se non è richiesta alcuna query.

Trasformare i dati

Aggiorna ed elimina righe e colonne dalle tabelle BigQuery utilizzando pyarrow o pandas. Se vuoi utilizzare le trasformazioni pandas, mantieni il tipo di input come pyarrow ed esegui la conversione in pandas all'interno della funzione definita dall'utente (UDF) in modo da poter rilevare eventuali errori di tipo di conversione pandas all'interno della UDF. Un'attività Ray deve eseguire la trasformazione.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Scrivere i dati in BigQuery

Inserisci i dati nel set di dati BigQuery. Un'attività Ray deve eseguire la scrittura.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Dove:

  • DATASET: il set di dati BigQuery. Il set di dati deve essere nel formato dataset.table.

Passaggi successivi