Usar o Ray na Vertex AI com o BigQuery

Ao executar um aplicativo do Ray na Vertex AI, use o BigQuery como banco de dados na nuvem. Esta seção aborda como ler e gravar em um banco de dados do BigQuery a partir do cluster do Ray na Vertex AI. As etapas desta seção pressupõem que você use o SDK da Vertex AI para Python.

Para ler informações de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um atual.

Importar e inicializar o Ray no cliente da Vertex AI

Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o kernel e execute o código a seguir. A variável runtime_env é necessária no momento da conexão, para executar comandos do BigQuery.

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

Ler dados do BigQuery

Leia os dados do conjunto de dados do BigQuery. Uma tarefa do Ray precisa executar a operação de leitura.

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

Em que:

  • PROJECT_ID:ID do projeto Google Cloud . Encontre o ID do projeto na página de boas-vindas do console do Google Cloud .

  • LOCATION: o local onde o Dataset está armazenado. Por exemplo, us-central1.

  • DATASET: conjunto de dados do BigQuery. Ele precisa estar no formato dataset.table. Defina como None se você fornecer uma consulta.

  • PARALLELISM: um número inteiro que influencia quantas tarefas de leitura são criadas em paralelo. Pode haver menos streams de leitura criados do que solicitado.

  • QUERY: uma string contendo uma consulta SQL para leitura do banco de dados do BigQuery. Defina como None se nenhuma consulta for necessária.

Transformar dados

Atualize e exclua linhas e colunas das tabelas do BigQuery usando pyarrow ou pandas. Se você quiser usar transformações pandas, mantenha o tipo de entrada como pyarrow e converta em pandas na função definida pelo usuário (UDF). Assim, será possível capturar erros de tipo de conversão pandas no UDF. Uma tarefa do Ray precisa realizar a transformação.

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

Gravar dados no BigQuery

Insira dados no seu conjunto de dados do BigQuery. Uma tarefa do Ray precisa realizar a gravação.

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

Em que:

  • DATASET: conjunto de dados do BigQuery. O conjunto de dados precisa estar no formato dataset.table.

A seguir