Cuando ejecutes una aplicación de Ray en Vertex AI, usa BigQuery como tu base de datos en la nube. En esta sección, se explica cómo leer y escribir en una base de datos de BigQuery desde tu clúster de Ray en Vertex AI. En los pasos de esta sección, se supone que usas el SDK de Vertex AI para Python.
Para leer desde un conjunto de datos de BigQuery, crea un conjunto de datos nuevo de BigQuery o usa uno existente.
Importa e inicializa el cliente de Ray en Vertex AI
Si ya estás conectado a tu clúster de Ray en Vertex AI, reinicia tu kernel y ejecuta el siguiente código. La variable runtime_env
es necesaria en el momento de la conexión para ejecutar comandos de BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Leer datos desde BigQuery
Lee datos de tu conjunto de datos de BigQuery. Una tarea de Ray debe realizar la operación de lectura.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Aquí:
PROJECT_ID: Google Cloud ID del proyecto. Busca el ID del proyecto en la página de bienvenida de la consola de Google Cloud .
LOCATION: Es la ubicación en la que se almacena el
Dataset
. Por ejemplo,us-central1
DATASET: Es el conjunto de datos de BigQuery. Debe tener el formato
dataset.table
. Se establece enNone
si proporcionas una búsqueda.PARALLELISM: Es un número entero que influye en cuántas tareas de lectura se crean en paralelo. Es posible que haya menos transmisiones de lectura creadas de las que solicitaste.
QUERY: Es una cadena que contiene una consulta en SQL para leer desde la base de datos de BigQuery. Se establece en
None
si no se requiere ninguna búsqueda.
Transforma los datos
Actualiza y borra filas y columnas de tus tablas de BigQuery con pyarrow
o pandas
. Si deseas usar las transformaciones de pandas
, mantén el tipo de entrada como pyarrow y conviértelo a pandas
en la función definida por el usuario (UDF) para poder detectar cualquier error de tipo de conversión de pandas
en el UDF. Una tarea de Ray debe realizar la transformación.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Escribir datos a BigQuery
Inserta datos en tu conjunto de datos de BigQuery. Una tarea de Ray debe realizar la escritura.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Aquí:
- DATASET: Es el conjunto de datos de BigQuery. El conjunto de datos debe tener el formato
dataset.table
.