Use o Ray no Vertex AI com o BigQuery

Quando executa uma aplicação Ray no Vertex AI, use o BigQuery como base de dados na nuvem. Esta secção aborda como ler e escrever numa base de dados do BigQuery a partir do seu cluster do Ray no Vertex AI. Os passos nesta secção partem do princípio de que usa o SDK Vertex AI para Python.

Para ler a partir de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um conjunto de dados existente.

Importe e inicialize o cliente do Ray no Vertex AI

Se tiver ligação ao cluster do Ray no Vertex AI, reinicie o kernel e execute o seguinte código. A variável runtime_env é necessária no momento da ligação para executar comandos do BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Leia dados do BigQuery

Ler dados do seu conjunto de dados do BigQuery. Uma tarefa do Ray tem de realizar a operação de leitura.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Onde:

Transforme dados

Atualize e elimine linhas e colunas das suas tabelas do BigQuery através do pyarrow ou do pandas. Se quiser usar transformações pandas, mantenha o tipo de entrada como pyarrow e converta para pandas na função definida pelo utilizador (UDF) para poder detetar erros de tipo de conversão pandas na UDF. Uma tarefa Ray tem de realizar a transformação.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Escreva dados no BigQuery

Insira dados no seu conjunto de dados do BigQuery. Uma tarefa do Ray (Ray Task) tem de executar a escrita.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Onde:

  • DATASET: conjunto de dados do BigQuery. O conjunto de dados tem de estar no formato dataset.table.

O que se segue?