Vertex AI Feature Store(従来版)から Bigtable に移行する

ML 特徴管理ワークロードを Vertex AI Feature Store(従来版)から Bigtable に移行すると、パフォーマンスと柔軟性が向上します。このガイドでは、関連するコンセプトと移行プロセスの概要について説明します。

Vertex AI Feature Store(従来版)は、オンライン サービング レイヤに Bigtable を使用するマネージド環境です。Vertex AI Feature Store(従来版)を使用せずに、AI プラットフォームまたは特徴ストアを Bigtable で直接実行すると、速度が向上し、費用を削減できます。

Vertex AI Feature Store(従来版)の基盤となる Bigtable テーブルから、 Google Cloud プロジェクトで作成する Bigtable インスタンスにデータを移行することに重点を置いた、最小限の実行可能な移行パスをおすすめします。

移行のメリット

Bigtable への移行には、戦略的および運用上の利点がいくつかあります。

  • 費用対効果: Vertex AI Feature Store(従来版)固有のノード管理プレミアムが不要になり、インフラストラクチャ費用が削減されることがよくあります。
  • 直接制御: Bigtable の機能を完全に利用できます。Bigtable モニタリングでは、Vertex AI Feature Store(従来版)よりも多くの指標が公開されます。カスタム アーキテクチャのレイアウトとスケーリングをより細かく制御することもできます。
  • 高パフォーマンス: Bigtable は、高パフォーマンスのワークロードと、書き込み時の集計ベクトル検索などの高パフォーマンス機能をサポートしています。
  • プロダクト間の統合: BigQuery 外部テーブルApache SparkApache FlinkKafka Connect のコネクタ、BigQuery からのリバース ETL などの Bigtable 統合にアクセスできます。

  • 変更データ キャプチャ: 変更ストリームを有効にして、Bigtable 特徴ストア テーブルに対する変更を、変更の発生時にキャプチャできます。

主なコンセプト

このセクションでは、Vertex AI Feature Store(従来版)のコアコンセプトが Bigtable と BigQuery によってどのように実装されるかについて説明します。

データの保持

Bigtable では、ガベージ コレクションを使用してデータ保持を管理します。ガベージ コレクションは、期限切れデータや古くなったデータを Bigtable テーブルから自動的かつ継続的に削除するプロセスです。ガベージ コレクション ポリシーは、Bigtable で列ファミリーとして定義された特定の機能のデータが不要になるタイミングを規定するユーザー作成の複数のルールからなります。ガベージ コレクション ポリシーは、データに関連付けられたタイムスタンプまたは保持するバージョン数に基づいて設定されます。

ガベージ コレクションは、コンパクション中に実行される組み込みの非同期バックグラウンド プロセスです。ガベージ コレクションは固定スケジュールで実行されます。データが削除されるまで、読み取り結果に表示されますが、読み取りをフィルタリングしてこのデータを除外できます。詳細については、ガベージ コレクションの概要をご覧ください。

また、Bigtable の階層型ストレージは、モデル トレーニングや規制遵守のために履歴データを保持する必要があるオンライン特徴ストアにとって、費用対効果の高いソリューションとなります。階層型ストレージは、アクセス頻度の低いデータを SSD ストレージのオンライン サービングから低コストのストレージ階層に移動します。

機能開発

Bigtable では、Bigtable SQL を使用してオンライン機能開発を実装できます。また、BigQuery DataFrames を使用してオフライン機能開発を実装できます。

Vertex AI Feature Store(レガシー)を使用する場合、BigQuery で準備された基盤となるデータソースにマッピングされるデベロッパー API とデータモデルを使用します。次に、これらのデータソースと特定の特徴列を特徴レジストリに登録します。Bigtable 特徴ストアを使用すると、Vertex AI Feature Store(レガシー)データモデルにマッピングする必要なく、基盤となる BigQuery インスタンスと Bigtable インスタンスのデータを直接操作できます。

オンライン機能の開発

オンライン特徴開発では、Bigtable には次のようないくつかのツールが用意されています。

オフラインの特徴開発

オフラインの特徴開発では、BigQuery DataFrames が 750 を超える pandas API と scikit-learn API を備えた Python インターフェースを提供します。これらの API は、BigQuery API と BigQuery ML API への SQL 変換を通じて実装されます。BigQuery DataFrames の特徴生成では、組み込みの Python 関数とユーザー定義の Python 関数の両方を使用できます。また、次のセクションで説明するように、バッチ処理とオフライン処理で作成された特徴をサービングするために、Bigtable への自動データ同期も提供します。

オンライン機能とオフライン機能の同期

ML ワークロードに Bigtable を直接使用すると、オフラインの特徴値が BigQuery からインポートされ、トレーニングとサービングの両方に同じ値が再利用されるため、トレーニングとサービングの間で特徴を生成するコードパスの同期を維持できます。次のテクノロジーにより、機能の同期が可能になります。

  • バッチ同期: BigQuery から Bigtable へのリバース ETL により、BigQuery クエリの結果を Bigtable にエクスポートできます。これらのクエリはバッチで実行され、BigQuery から直接スケジュール設定できます。
  • ストリーミング同期: BigQuery の継続的クエリは、継続的に実行され、Bigtable テーブルに行を出力する SQL ステートメントです。
  • BigQuery DataFrames からの同期: Python で開発されたオフライン機能をキャプチャするには、BigFrames StreamingDataFrame を使用して、特徴生成用の Python ロジックをキャプチャし、データ結果を Bigtable と同期する BigQuery 継続的クエリを生成します。
  • Bigtable データに対するオフライン特徴の直接開発: BigQuery 外部テーブルを使用して、Bigtable に保存されているデータに対して BigQuery でオフライン特徴を構築できます。外部テーブルは BigQuery テーブルの外観を反映し、データを BigQuery ストレージに戻す必要なく、結合、スケジュール設定されたクエリ、高度な BigQuery SQL 関数など、ほとんど同じ機能を提供します。トラフィックを処理するアプリケーションへの影響を防ぐには、BigQuery 外部テーブルを使用して Bigtable データを読み取るときに Data Boost サーバーレス コンピューティングを使用します。Data Boost は、アドホック クエリで特に費用対効果を発揮します。Data Boost を使用するには、外部テーブル定義の作成時に Data Boost アプリ プロファイルを指定します。Data Boost の詳細については、 Bigtable Data Boost の概要をご覧ください。

移行後も、Vertex AI Model Monitoring を使用してモデルの品質を追跡できます。

Bigtable と BigQuery を組み合わせて使用することは、リアルタイム分析データベースを構築する際の一般的なパターンです。

移行の各フェーズ

サービスの継続性を確保するため、移行は通常、次の異なるフェーズで実行されます。

フェーズ 1: インフラストラクチャを準備する

移行を開始する前に、移行先環境を設定します。

フェーズ 2: Vertex AI Feature Store(従来版)と Bigtable 間のスキーマ マッピングを定義する

  1. Bigtable スキーマ設計のベスト プラクティスを確認して理解します。Vertex AI Feature Store(従来版)API と Bigtable API の一般的なマッピングは次のとおりです。

    Vertex AI Feature Store(従来版)リソース

    Bigtable コンポーネント

    FeatureOnlineStore

    Bigtable のインスタンス

    FeatureView

    列ファミリー

    featureValues(バッチ)

    列(キーごとに 1 つのセル)

    featureValues(継続的)

    列(キーあたりの複数のセル [バージョン管理])

  2. スキーマ マッピングを定義したら、ソース特徴ストアの各特徴の列ファミリーを含む Bigtable テーブルを作成します。

フェーズ 3: データの抽出と同期

このフェーズでは、データの更新頻度に基づいて階層化されたアプローチを使用してデータを移行します。

リアルタイム機能の同期

write_feature_values または同等の API 呼び出しで記述している機能については、新しい Bigtable テーブルに同じデータの書き込みを開始します。

  1. Bigtable 用 Python クライアント ライブラリをインストールします。
  2. 特徴データを Vertex AI Feature Store(従来版)と Bigtable に同時に書き込むようにアプリケーションを構成します。Bigtable へのデータの書き込みの詳細については、書き込みをご覧ください。

バッチ特徴の移行

次に、デュアル書き込みを開始する前に保存されたデータを移行します。これには、Vertex AI Feature Store(従来版)から BigQuery、Bigtable にデータを移動する必要があります。

  1. Vertex AI Feature Store(従来版)のエクスポート機能を使用して、featurestore データを BigQuery にエクスポートします。これにより、すべての値またはスナップショットをエクスポートできます。これにより、BigQuery を Vertex AI Feature Store(従来版)のオフライン ストアとして使用できます。
  2. 次のいずれかを使用して、BigQuery から Bigtable に履歴データを移行します。
    1. リバース ETL
    2. Bigtable Spark コネクタ
    3. BigQuery to Bigtable Dataflow テンプレート

フェーズ 4: アプリケーションと SDK の移行

最後のステップは、アプリケーション レイヤの切り替えです。

  1. 移行が完了してテストされたら、Vertex AI Feature Store(従来版)への書き込みを停止します。
  2. Bigtable 用の Python クライアント ライブラリのみを使用するようにアプリケーションを変更します。

    次の例は、Python を使用して Bigtable から単一の特徴を取得する方法を示しています。

    from google.cloud import bigtable
    from google.cloud.bigtable import row_filters
    # Replace 'project_id' and 'instance_id' with your actual IDs.
    client = bigtable.Client(project=project_id)
    instance = client.instance(instance_id)
    
    #return only the latest feature
    row_filter = bigtable.row_filters.CellsColumnLimitFilter(1)
    
    # Replace 'user1' and 'feature0` with your actual row key and column qualifier.
    
    print("Getting a single feature by row key.")
    key = "user1".encode()
    
    row = table.read_row(key, row_filter)
    cell = row.cells[column_family_id.decode("utf-8")][feature0][0]
    print(cell.value.decode("utf-8"))
    

    Bigtable データ API と管理 API を使用してデータを読み書きする方法の別の例については、Python の Hello World をご覧ください。

    Bigtable 用の Python クライアント ライブラリでは、GoogleSQL を使用してフィルタ条件を満たす特徴を返すことや、特徴の変換を行うこともできます。次の例は、Bigtable Python クライアント ライブラリから SQL クエリを非同期で呼び出す方法を示しています。Bigtable 用 GoogleSQL の詳細については、SQL のその他の例をご覧ください。

    import asyncio
    from google.cloud.bigtable.data_async import BigtableDataClient
    from google.cloud.bigtable_v2.types import ExecuteQueryRequest
    
    async def run_bigtable_sql_query(project_id, instance_id, table_id):
        """
        Runs a GoogleSQL query on a Bigtable table using the async client.
        """
        client = BigtableDataClient(project_id=project_id)
        instance = client.instance(instance_id)
        table = instance.table(table_id)
    
        # Example query: Select a specific row and all columns from a column family
        # Replace 'my_table' and 'my_cf' with your actual table and column family IDs.
        # The table name in the SQL must be in the format `dataset.table`,
        # where dataset is the instance ID and table is the table ID (in backticks).
        sql_query = f"SELECT _key, my_cf FROM `{instance_id}`.`{table_id}` WHERE _key = 'user_123'"
    
        print(f"Executing query: {sql_query}")
    
        # The client library automatically handles the SQL execution
        try:
            # The query method returns an AsyncPartialRowsIterator
            results_iterator = await table.query(query=sql_query)
    
            async for row in results_iterator:
                print(f"Row key: {row.row_key.decode('utf-8')}")
                # Iterate through the cells in the row
                for col_family, cells in row.cells.items():
                    for cell in cells:
                        print(f"  Column Family: {col_family}, Qualifier: {cell.qualifier.decode('utf-8')}, Value: {cell.value.decode('utf-8')}, Timestamp: {cell.timestamp_micros}")
    
        except Exception as e:
            print(f"An error occurred: {e}")
        finally:
            await client.close()
    
    if __name__ == "__main__":
        # TODO(developer): Replace with your project, instance, and table IDs
        your_project_id = "your-gcp-project-id"
        your_instance_id = "your-bigtable-instance-id"
        your_table_id = "your-bigtable-table-id"
    
        # Run the asynchronous function
        asyncio.run(run_bigtable_sql_query(your_project_id, your_instance_id, your_table_id))
    
  3. Bigtable の指標を使用して、レイテンシとスループットのモニタリングを開始します。詳細については、モニタリングをご覧ください。

ベスト プラクティス

Vertex AI Feature Store(従来版)から Bigtable 特徴量ストアの実装に移行したら、以前にサービスで処理されていた内部の前処理と最適化のロジックを複製して、安定性とパフォーマンスを維持する必要があります。

クライアントサイドのアダプティブ スロットリング

Vertex AI Feature Store(従来版)のバックエンドは、クライアントサイドの適応型スロットラーを使用して、トラフィックの急増時や、ストレージ バックエンドでレイテンシの増加やエラーが発生した場合に、基盤となる Bigtable インスタンスが過負荷にならないようにします。バックエンド レスポンスを登録し、必要に応じてリクエストを事前にスロットリングするために、アプリケーション コードで同様のスロットラーを実装することをおすすめします。

リクエストのパーティショニングとバッチサイズの最適化

Bigtable 行フィルタには 20 KB の上限があります。1 回のフィルタリングされた読み取りで特徴量またはエンティティ ID をリクエストしすぎると、リクエストが失敗する可能性があります。Vertex AI Feature Store(従来版)の動作をミラーリングするには、次の操作を行います。

  • チャンク特徴 ID: Bigtable 読み取りあたりの特徴 ID の数を約 100 に制限します。
  • エンティティ バッチのバランスを取る: 複数のエンティティの読み取りを実行するときにクライアントまたはサーバーのリソースが飽和状態にならないようにするには、次の予防措置を講じます。
    • エンティティを小さな同時実行バッチ(バッチあたり 10 個のエンティティなど)に分割します。
    • 同時バッチ リクエストの最大数を 10 ~ 20 などに制限します。

インテリジェントなフィルタ選択

サーバー側で列フィルタを計算して適用すると、オーバーヘッドが発生します。アプリケーションが通常、列ファミリーのほぼすべての特徴(99.9% 超など)をリクエストする場合は、列フィルタをスキップして行全体を読み取り、結果をクライアント側でフィルタリングする方が効率的です。

同時実行と非同期実行

ストリーミング シナリオで初回結果までの時間を最小限に抑えるには、非同期パターンまたはスレッド バンドルを使用してエンティティ バッチを並行して取得します。これにより、アプリケーションは、大規模なシリアル読み取りの完了を待つのではなく、最初のバッチが返されるとすぐに結果の処理を開始できます。

次のステップ