Quando executa uma aplicação Ray no Vertex AI, use o BigQuery como base de dados na nuvem. Esta secção aborda como ler e escrever numa base de dados do BigQuery a partir do seu cluster do Ray no Vertex AI. Os passos nesta secção partem do princípio de que usa o SDK Vertex AI para Python.
Para ler a partir de um conjunto de dados do BigQuery, crie um novo conjunto de dados do BigQuery ou use um conjunto de dados existente.
Importe e inicialize o cliente do Ray no Vertex AI
Se tiver ligação ao cluster do Ray no Vertex AI, reinicie o kernel e execute o seguinte código. A variável runtime_env
é necessária no momento da ligação para executar comandos do BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Leia dados do BigQuery
Ler dados do seu conjunto de dados do BigQuery. Uma tarefa do Ray tem de realizar a operação de leitura.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Onde:
PROJECT_ID: Google Cloud ID do projeto. Encontre o ID do projeto na página de boas-vindas da Google Cloud consola.
LOCATION: a localização onde o
Dataset
está armazenado. Por exemplo,us-central1
.DATASET: conjunto de dados do BigQuery. Tem de estar no formato
dataset.table
. Defina comoNone
se fornecer uma consulta.PARALLELISM: um número inteiro que influencia o número de tarefas de leitura criadas em paralelo. Podem ser criadas menos streams de leitura do que as que pediu.
QUERY: uma string que contém uma consulta SQL para leitura da base de dados do BigQuery. Defina como
None
se não for necessária nenhuma consulta.
Transforme dados
Atualize e elimine linhas e colunas das suas tabelas do BigQuery através do
pyarrow
ou do pandas
. Se quiser usar transformações pandas
, mantenha o tipo de entrada como pyarrow e converta para pandas
na função definida pelo utilizador (UDF) para poder detetar erros de tipo de conversão pandas
na UDF. Uma tarefa Ray tem de realizar a transformação.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Escreva dados no BigQuery
Insira dados no seu conjunto de dados do BigQuery. Uma tarefa do Ray (Ray Task) tem de executar a escrita.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Onde:
- DATASET: conjunto de dados do BigQuery. O conjunto de dados tem de estar no formato
dataset.table
.