从 Vertex AI Feature Store(旧版)迁移到 Bigtable
将机器学习特征管理工作负载从 Vertex AI Feature Store(旧版)迁移到 Bigtable 可以提高性能和灵活性。本指南简要介绍了相关概念和迁移过程。
Vertex AI Feature Store(旧版)是一种托管式环境,使用 Bigtable 作为其在线传送层。直接在 Bigtable 上运行 AI 平台或特征存储区,而不使用 Vertex AI Feature Store(旧版),可以提高速度并降低成本。
我们建议采用最小可行迁移路径,重点是将数据从 Vertex AI Feature Store(旧版)中的底层 Bigtable 表迁移到您在 Google Cloud 项目中创建的 Bigtable 实例。
迁移的优势
迁移到 Bigtable 可带来多项战略和运营优势:
- 成本效益:您无需支付 Vertex AI Feature Store(旧版)特定的节点管理费用,通常可以降低基础设施成本。
- 直接控制:您可以完全访问 Bigtable 功能。与 Vertex AI Feature Store(旧版)相比,Bigtable 监控可提供更多指标。您还可以更好地控制自定义架构布局和伸缩。
- 高性能:Bigtable 支持高性能工作负载和高性能功能,例如写入时聚合和向量搜索。
跨产品集成:您可以访问 Bigtable 集成,例如 BigQuery 外部表、适用于 Apache Spark、Apache Flink 和 Kafka Connect 的连接器,以及从 BigQuery 进行反向 ETL。
变更数据捕获:您可以启用变更数据流,以便在发生更改时捕获对 Bigtable 特征存储区表的更改。
主要概念
本部分介绍了 Bigtable 和 BigQuery 如何实现核心 Vertex AI Feature Store(旧版)概念。
数据保留
在 Bigtable 中,您可以通过垃圾回收来管理数据保留。垃圾回收是不断自动从 Bigtable 表中移除过期和过时数据的过程。垃圾回收政策是您创建的一组规则,用于说明何时不再需要特定功能(在 Bigtable 中定义为列族)中的数据。垃圾回收政策是根据与数据关联的时间戳或您要保留的版本数来设置的。
垃圾回收是一个内置的异步后台过程,在压缩期间进行。垃圾回收按固定的时间表进行。在数据被删除之前,会显示在读取结果中,但您可以过滤读取结果以排除此数据。如需了解详情,请参阅垃圾回收概览。
此外,对于需要保留历史数据以进行模型训练或满足法规遵从要求的在线特征存储区,Bigtable 分层存储可以是一种经济实惠的解决方案。分层存储可管理不常访问的数据从 SSD 存储空间上的在线服务到低成本存储层的转移。
功能开发
在 Bigtable 中,您可以使用 Bigtable SQL 实现在线特征开发,也可以使用 BigQuery DataFrames 实现离线特征开发。
使用 Vertex AI Feature Store(旧版)时,您需要使用与 BigQuery 中准备的基础数据源相对应的开发者 API 和数据模型。然后,您可以在特征注册表中注册这些数据源和特定特征列。借助 Bigtable 特征存储区,您可以直接处理底层 BigQuery 和 Bigtable 实例中的数据,而无需映射到 Vertex AI Feature Store(旧版)数据模型。
在线功能开发
对于在线特征开发,Bigtable 提供了多种工具:
- Python 客户端库:使用 Python 版 Bigtable 客户端库直接处理 Bigtable 中的数据。
- GoogleSQL:使用 GoogleSQL 读取和转换 Bigtable 中的数据。您可以直接在 Bigtable 控制台中或通过 Python 客户端库执行 SQL 查询。
- 持续物化视图:如需创建需要定期转换和汇总的近乎实时的特征,请使用 Bigtable 持续物化视图,以便在数据进入 Bigtable 时以增量方式针对数据运行 SQL 查询。
线下功能开发
对于离线特征开发,BigQuery DataFrames 提供了一个 Python 接口,其中包含 750 多个 Pandas 和 scikit-learn API。这些 API 通过 SQL 转换实现为 BigQuery 和 BigQuery ML API。BigQuery DataFrames 特征生成功能支持内置 Python 函数和用户定义的 Python 函数。它还提供自动数据同步功能,可将批量和离线流程中创建的特征同步到 Bigtable 以用于提供服务,如下一部分中所述。
在线和离线功能同步
直接使用 Bigtable 处理机器学习工作负载时,您可以确保从 BigQuery 导入离线特征值,并对训练和部署重复使用该值,从而使训练和部署之间的特征生成代码路径保持同步。以下技术可实现功能同步:
- 批量同步:从 BigQuery 到 Bigtable 的反向 ETL 可将 BigQuery 查询的结果导出到 Bigtable。这些查询以批处理方式运行,并且可以直接从 BigQuery 中安排。
- 流式同步:BigQuery 持续查询是持续运行并将行输出到 Bigtable 表中的 SQL 语句。
- 从 BigQuery DataFrames 进行同步:为了捕获在 Python 中开发的离线特征,您可以使用 BigFrames StreamingDataFrame 生成一个 BigQuery 连续查询,该查询可捕获您的 Python 特征生成逻辑,并将数据结果与 Bigtable 同步。
- 直接基于 Bigtable 数据进行离线特征开发:您可以使用 BigQuery 外部表在 BigQuery 中针对存储在 Bigtable 中的数据构建离线特征。外部表在外观上与 BigQuery 表类似,并提供大部分相同的功能,例如联接、预定查询和高级 BigQuery SQL 函数,而无需将数据移回 BigQuery 存储空间。为避免影响应用处理流量,您可以在使用 BigQuery 外部表读取 Bigtable 数据时使用 Data Boost 无服务器计算。对于临时查询,使用 Data Boost 尤其具有成本效益。如需使用 Data Boost,请在创建外部表定义时指定 Data Boost 应用配置文件。如需详细了解 Data Boost,请参阅 Bigtable Data Boost 概览。
迁移后,您可以继续使用 Vertex AI 模型监控来跟踪模型质量。
将 Bigtable 和 BigQuery 搭配使用是构建实时分析数据库的常见模式。
迁移阶段
为确保服务持续性,迁移通常分以下几个阶段执行。
第 1 阶段:准备基础架构
在开始迁移之前,请设置目标环境:
- 在项目中创建 Bigtable 实例,以用作新的在线存储区。
- 创建 BigQuery 数据集和表,以临时存储从 Vertex AI Feature Store(旧版)导出的数据。
- 配置 IAM:确保执行迁移的账号拥有从旧版特征存储区读取数据以及向新的 Bigtable 实例写入数据的权限。如需了解详情,请参阅 使用 IAM 进行 Bigtable 访问权限控制和控制对 Vertex AI Feature Store(旧版)资源的访问权限。
第 2 阶段:定义 Vertex AI Feature Store(旧版)与 Bigtable 之间的架构映射
查看并了解 Bigtable 架构设计最佳实践。Vertex AI Feature Store(旧版)API 与 Bigtable API 的一般映射关系如下:
Vertex AI Feature Store(旧版)资源
Bigtable 组件
FeatureOnlineStoreBigtable 实例
FeatureView列族
featureValues(批次)列(每个键对应一个单元格)
featureValues(持续)列(每个键对应多个单元格 [版本控制])
定义架构映射后,创建一个 Bigtable 表,该表包含一个列族,用于存储源功能区中的每个功能。
第 3 阶段:数据提取和同步
在此阶段,您将根据数据的更新频率,采用分层方法迁移数据。
实时功能同步
对于您使用 write_feature_values 或等效 API 调用编写的功能,请开始将相同的数据写入新的 Bigtable 表。
- 安装 Python 版 Bigtable 客户端库。
- 配置应用,以同时将特征数据写入 Vertex AI Feature Store(旧版)和 Bigtable。如需详细了解如何将数据写入 Bigtable,请参阅写入。
批量功能迁移
接下来,迁移在开始双重写入之前存储的数据。这涉及将数据从 Vertex AI Feature Store(旧版)移至 BigQuery,然后再移至 Bigtable。
- 使用 Vertex AI Feature Store(旧版)的导出功能将特征存储区数据导出到 BigQuery,该功能可让您导出所有值或快照。这样一来,BigQuery 就可以充当 Vertex AI Feature Store(旧版)的离线存储区。
- 使用以下任一方法将历史数据从 BigQuery 迁移到 Bigtable:
- 反向 ETL
- Bigtable Spark 连接器
- BigQuery to Bigtable Dataflow 模板
第 4 阶段:应用和 SDK 过渡
最后一步是切换应用层。
- 迁移完成并经过测试后,停止向 Vertex AI Feature Store(旧版)写入数据。
修改应用,使其仅使用适用于 Bigtable 的 Python 客户端库。
以下示例演示了如何使用 Python 从 Bigtable 中提取单个特征。
from google.cloud import bigtable from google.cloud.bigtable import row_filters # Replace 'project_id' and 'instance_id' with your actual IDs. client = bigtable.Client(project=project_id) instance = client.instance(instance_id) #return only the latest feature row_filter = bigtable.row_filters.CellsColumnLimitFilter(1) # Replace 'user1' and 'feature0` with your actual row key and column qualifier. print("Getting a single feature by row key.") key = "user1".encode() row = table.read_row(key, row_filter) cell = row.cells[column_family_id.decode("utf-8")][feature0][0] print(cell.value.decode("utf-8"))如需查看有关如何使用 Bigtable 数据和管理 API 读取和写入数据的其他示例,请参阅 Python 版 Hello World。
Bigtable 的 Python 客户端库还允许您使用 GoogleSQL 返回符合过滤条件的特征,或对特征执行转换。以下示例展示了如何从 Bigtable Python 客户端库异步调用 SQL 查询。如需详细了解适用于 Bigtable 的 GoogleSQL,请参阅其他 SQL 示例。
import asyncio from google.cloud.bigtable.data_async import BigtableDataClient from google.cloud.bigtable_v2.types import ExecuteQueryRequest async def run_bigtable_sql_query(project_id, instance_id, table_id): """ Runs a GoogleSQL query on a Bigtable table using the async client. """ client = BigtableDataClient(project_id=project_id) instance = client.instance(instance_id) table = instance.table(table_id) # Example query: Select a specific row and all columns from a column family # Replace 'my_table' and 'my_cf' with your actual table and column family IDs. # The table name in the SQL must be in the format `dataset.table`, # where dataset is the instance ID and table is the table ID (in backticks). sql_query = f"SELECT _key, my_cf FROM `{instance_id}`.`{table_id}` WHERE _key = 'user_123'" print(f"Executing query: {sql_query}") # The client library automatically handles the SQL execution try: # The query method returns an AsyncPartialRowsIterator results_iterator = await table.query(query=sql_query) async for row in results_iterator: print(f"Row key: {row.row_key.decode('utf-8')}") # Iterate through the cells in the row for col_family, cells in row.cells.items(): for cell in cells: print(f" Column Family: {col_family}, Qualifier: {cell.qualifier.decode('utf-8')}, Value: {cell.value.decode('utf-8')}, Timestamp: {cell.timestamp_micros}") except Exception as e: print(f"An error occurred: {e}") finally: await client.close() if __name__ == "__main__": # TODO(developer): Replace with your project, instance, and table IDs your_project_id = "your-gcp-project-id" your_instance_id = "your-bigtable-instance-id" your_table_id = "your-bigtable-table-id" # Run the asynchronous function asyncio.run(run_bigtable_sql_query(your_project_id, your_instance_id, your_table_id))开始使用 Bigtable 指标监控延迟时间和吞吐量。如需了解详情,请参阅 Monitoring。
最佳做法
从 Vertex AI Feature Store(旧版)迁移到 Bigtable 特征存储区实现后,您需要复制之前由该服务处理的内部预处理和优化逻辑,以保持稳定性和性能。
客户端自适应限制
Vertex AI Feature Store(旧版)后端利用客户端自适应节流器来保护其底层 Bigtable 实例,以免在流量高峰期间或存储后端出现高延迟或错误时过载。建议您在应用代码中实现类似的节流器,以注册后端响应并在需要时主动节流请求。
请求分区和批次大小优化
Bigtable 行过滤器的硬性限制为 20 KB。在单个过滤读取请求中请求过多的特征或实体 ID 可能会导致请求失败。如需镜像 Vertex AI Feature Store(旧版)的行为,请执行以下操作:
- 分块特征 ID:将每次 Bigtable 读取的特征 ID 数量限制为大约 100 个。
- 平衡实体批次:为防止在执行多实体读取时使客户端或服务器资源饱和,请采取以下预防措施:
- 将实体划分为小型的并发批次,例如每个批次 10 个实体。
- 限制并发批处理请求的数量上限,例如 10-20。
智能过滤条件选择
在服务器端计算和应用列过滤条件会增加开销。如果您的应用通常会请求列族中的几乎所有特征(例如 >99.9%),则更高效的做法是跳过列过滤条件并读取整行,然后在客户端过滤结果。
并发和异步执行
为了在流式传输场景中尽可能缩短首次结果返回时间,请使用异步模式或线程包来并行提取实体批次。这样可确保应用在第一批结果返回后立即开始处理结果,而无需等待大型串行读取完成。
后续步骤
- 如需有关高吞吐量工作负载的帮助或架构指导,请与您的客户代表联系。
- 了解 Feast 的 Bigtable 集成。
- 了解 Credit Karma 如何利用 Bigtable 和 BigQuery 将模型预测次数扩增到每天 600 亿次。