Saat menjalankan aplikasi Ray di Platform Agen Gemini Enterprise, gunakan BigQuery sebagai database cloud. Bagian ini membahas cara membaca dan menulis ke database BigQuery dari cluster Ray di Platform Agen Gemini Enterprise. Langkah-langkah di bagian ini mengasumsikan bahwa Anda menggunakan Agent Platform SDK for Python.
Untuk membaca dari set data BigQuery, buat set data BigQuery baru atau gunakan set data yang ada.
Mengimpor dan menginisialisasi klien Ray di Platform Agen
Jika terhubung ke cluster Ray di Platform Agen Gemini Enterprise, mulai ulang kernel dan jalankan kode berikut. Variabel runtime_env diperlukan pada waktu koneksi untuk menjalankan perintah BigQuery.
import ray from google.cloud import aiplatform # The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster. address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME) runtime_env = { "pip": ["google-cloud-aiplatform[ray]","ray==2.47.1"] } ray.init(address=address, runtime_env=runtime_env)
Membaca data dari BigQuery
Membaca data dari set data BigQuery Anda. Tugas Ray harus melakukan operasi baca.
aiplatform.init(project=PROJECT_ID, location=LOCATION) @ray.remote def run_remotely(): import vertex_ray dataset = DATASET parallelism = PARALLELISM query = QUERY ds = vertex_ray.data.read_bigquery( dataset=dataset, parallelism=parallelism, query=query ) ds.materialize()
Dengan:
PROJECT_ID: Google Cloud ID project. Temukan ID project di Google Cloud halaman sambutan konsol.
LOCATION: Lokasi tempat
Datasetdisimpan. Contohnya,us-central1.DATASET: Set data BigQuery. Formatnya harus
dataset.table. Tetapkan keNonejika Anda memberikan kueri.PARALLELISM: Bilangan bulat yang memengaruhi jumlah tugas baca yang dibuat secara paralel. Jumlah aliran baca yang dibuat mungkin lebih sedikit dari yang Anda minta.
QUERY: String yang berisi kueri SQL untuk dibaca dari database BigQuery. Tetapkan ke
Nonejika tidak ada kueri yang diperlukan.
Mentransformasi data
Perbarui dan hapus baris dan kolom dari tabel BigQuery menggunakan pyarrow atau pandas. Jika Anda ingin menggunakan transformasi pandas, pertahankan jenis input sebagai pyarrow dan konversi ke pandas dalam fungsi yang ditentukan pengguna (UDF) sehingga Anda dapat menangkap error jenis konversi pandas dalam UDF. Tugas
Ray harus melakukan transformasi.
@ray.remote def run_remotely(): # BigQuery Read first import pandas as pd import pyarrow as pa def filter_batch(table: pa.Table) -> pa.Table: df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get) # PANDAS_TRANSFORMATIONS_HERE return pa.Table.from_pandas(df) ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle() ds.materialize() # You can repartition before writing to determine the number of write blocks ds = ds.repartition(4) ds.materialize()
Menulis data ke BigQuery
Menyisipkan data ke set data BigQuery. Tugas Ray harus melakukan penulisan.
@ray.remote def run_remotely(): # BigQuery Read and optional data transformation first dataset=DATASET vertex_ray.data.write_bigquery( ds, dataset=dataset )
Dengan:
- DATASET: Set data BigQuery. Set data harus dalam format
dataset.table.
Langkah berikutnya
Men-deploy model di Platform Agen Gemini Enterprise dan mendapatkan prediksi
Melihat log untuk cluster Ray di Platform Agen Gemini Enterprise