将 Ray on Agent Platform 与 BigQuery 搭配使用

在 Gemini Enterprise Agent Platform 上运行 Ray 应用时,请将 BigQuery 用作云数据库。本部分介绍如何从 Gemini Enterprise Agent Platform 上的 Ray 集群 对 BigQuery 数据库进行读取和写入操作。 本部分中的步骤假定您使用的是 Agent Platform SDK for Python。

如需从 BigQuery 数据集读取数据,新建 BigQuery 数据集或使用现有数据集。

导入并初始化 Agent Platform 客户端上的 Ray

如果您已连接到 Gemini Enterprise Agent Platform 上的 Ray 集群,请重启内核并运行以下代码。连接时必须采用 runtime_env 变量才能运行 BigQuery 命令。

import ray
from google.cloud import aiplatform

# The CLUSTER_RESOURCE_NAME is the one returned from vertex_ray.create_ray_cluster.
address = 'vertex_ray://{}'.format(CLUSTER_RESOURCE_NAME)

runtime_env = {
    "pip":
       ["google-cloud-aiplatform[ray]","ray==2.47.1"]
  }

ray.init(address=address, runtime_env=runtime_env)

从 BigQuery 中读取数据

从 BigQuery 数据集读取数据。必须通过一个 Ray 任务来执行读取操作。

aiplatform.init(project=PROJECT_ID, location=LOCATION)

@ray.remote
def run_remotely():
    import vertex_ray
    dataset = DATASET
    parallelism = PARALLELISM
    query = QUERY

    ds = vertex_ray.data.read_bigquery(
        dataset=dataset,
        parallelism=parallelism,
        query=query
    )
    ds.materialize()

其中:

  • PROJECT_ID: Google Cloud 项目 ID。您可以在 Google Cloud 控制台的欢迎页面中找到项目 ID。

  • LOCATIONDataset 的存储位置。例如 us-central1

  • DATASET:BigQuery 数据集。必须采用 dataset.table 格式。如果您提供查询,则设置为 None

  • PARALLELISM:一个整数,用于指定并行创建的读取任务数。创建的读取流数可能少于您请求的数量。

  • QUERY:一个字符串,其中包含用于从 BigQuery 数据库读取数据的 SQL 查询。如果不需要查询,则设置为 None

转换数据

使用 pyarrowpandas 更新和删除 BigQuery 表中的行和列。如果您想使用 pandas 转换,建议将输入保留为 pyarrow 类型,在用户定义的函数 (UDF) 中再将其转换为 pandas 类型,以便您可以捕获 UDF 中的任何 pandas 转换类型错误。必须通过一个 Ray 任务来执行转换。

@ray.remote
def run_remotely():
    # BigQuery Read first
    import pandas as pd
    import pyarrow as pa

    def filter_batch(table: pa.Table) -> pa.Table:
        df = table.to_pandas(types_mapper={pa.int64(): pd.Int64Dtype()}.get)
        # PANDAS_TRANSFORMATIONS_HERE
        return pa.Table.from_pandas(df)

    ds = ds.map_batches(filter_batch, batch_format="pyarrow").random_shuffle()
    ds.materialize()

    # You can repartition before writing to determine the number of write blocks
    ds = ds.repartition(4)
    ds.materialize()

将数据写入 BigQuery

将数据插入 BigQuery 数据集。 必须通过一个 Ray 任务来执行写入。

@ray.remote
def run_remotely():
    # BigQuery Read and optional data transformation first
    dataset=DATASET
    vertex_ray.data.write_bigquery(
        ds,
        dataset=dataset
    )

其中:

  • DATASET:BigQuery 数据集。数据集必须采用 dataset.table 格式。

后续步骤