Ao executar um aplicativo do Ray na Vertex AI, use o BigQuery como banco de dados na nuvem. Esta seção aborda como ler e gravar em um banco de dados do BigQuery a partir do cluster do Ray na Vertex AI. As etapas desta seção pressupõem que você use o SDK da Vertex AI para Python.
Para ler informações de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um atual.
Importar e inicializar o Ray no cliente da Vertex AI
Se você já estiver conectado ao cluster do Ray na Vertex AI, reinicie o
kernel e execute o código a seguir. A variável runtime_env
é necessária no momento da conexão, para executar comandos do BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Ler dados do BigQuery
Leia os dados do conjunto de dados do BigQuery. Uma tarefa do Ray precisa executar a operação de leitura.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Em que:
PROJECT_ID:ID do projeto Google Cloud . Encontre o ID do projeto na página de boas-vindas do console do Google Cloud .
LOCATION: o local onde o
Dataset
está armazenado. Por exemplo,us-central1
.DATASET: conjunto de dados do BigQuery. Ele precisa estar no formato
dataset.table
. Defina comoNone
se você fornecer uma consulta.PARALLELISM: um número inteiro que influencia quantas tarefas de leitura são criadas em paralelo. Pode haver menos streams de leitura criados do que solicitado.
QUERY: uma string contendo uma consulta SQL para leitura do banco de dados do BigQuery. Defina como
None
se nenhuma consulta for necessária.
Transformar dados
Atualize e exclua linhas e colunas das tabelas do BigQuery usando pyarrow
ou pandas
. Se você quiser usar transformações pandas
,
mantenha o tipo de entrada como pyarrow e converta em pandas
na função definida pelo usuário (UDF). Assim, será possível capturar erros de tipo de conversão pandas
no UDF. Uma tarefa do Ray precisa realizar a transformação.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Gravar dados no BigQuery
Insira dados no seu conjunto de dados do BigQuery. Uma tarefa do Ray precisa realizar a gravação.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Em que:
- DATASET: conjunto de dados do BigQuery. O conjunto de dados precisa estar no formato
dataset.table
.