Lorsque vous exécutez une application Ray sur Vertex AI, utilisez BigQuery comme base de données cloud. Cette section explique comment lire et écrire dans une base de données BigQuery à partir de votre cluster Ray sur Vertex AI. Les étapes de cette section supposent que vous utilisez le SDK Vertex AI pour Python.
Pour lire les données d'un ensemble de données BigQuery, créez un ensemble de données BigQuery ou utilisez un ensemble de données existant.
Importer et initialiser le client Ray sur Vertex AI
Si vous êtes connecté à votre cluster Ray sur Vertex AI, redémarrez votre kernel et exécutez le code suivant. La variable runtime_env
est nécessaire au moment de la connexion pour exécuter les commandes BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Lire des données de BigQuery
Lisez les données depuis votre ensemble de données BigQuery Une tâche Ray doit effectuer l'opération de lecture.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Où :
PROJECT_ID : ID du projet Google Cloud . Trouvez l'ID du projet sur la page d'accueil de la console Google Cloud .
LOCATION : emplacement de stockage de
Dataset
. Par exemple,us-central1
.DATASET : ensemble de données BigQuery. Il doit être au format
dataset.table
. Définissez surNone
si vous fournissez une requête.PARALLELISM : entier qui influe sur le nombre de tâches de lecture créées en parallèle. Il se peut que le nombre de flux de lecture créés soit inférieur à celui demandé.
QUERY : chaîne contenant une requête SQL à lire à partir de la base de données BigQuery. Définie sur
None
si aucune requête n'est nécessaire.
Transformer les données
Mettez à jour et supprimez des lignes et des colonnes de vos tables BigQuery à l'aide de pyarrow
ou de pandas
. Si vous souhaitez utiliser des transformations pandas
, conservez le type d'entrée pyarrow et convertissez-le en pandas
dans la fonction définie par l'utilisateur (UDF) afin de pouvoir détecter les erreurs de type de conversion pandas
dans l'UDF. Une tâche Ray doit effectuer la transformation.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Écrire des données dans BigQuery
Insérez des données dans votre ensemble de données BigQuery. Une tâche Ray doit effectuer l'écriture.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Où :
- DATASET : ensemble de données BigQuery. L'ensemble de données doit être au format
dataset.table
.