Bigtable 変更ストリームからベクトル検索テンプレートへ

このテンプレートでは、Bigtable データ変更レコードをストリーミングし、Dataflow Runner V2 を使用して Vertex AI Vector Search に書き込むストリーミング パイプラインを作成します。

パイプラインの要件

  • Bigtable ソース インスタンスが存在している必要があります。
  • Bigtable ソーステーブルが存在し、そのテーブルで変更ストリームが有効になっている必要があります。
  • Bigtable アプリケーション プロファイルが存在している必要があります。
  • ベクトル検索インデックスのパスが存在している必要があります。

テンプレートのパラメータ

必須パラメータ

  • embeddingColumn: エンベディングが保存される列の完全修飾名。形式は cf:col です。
  • embeddingByteSize: エンベディング配列内の各エントリのバイトサイズ。浮動小数点数には 4、倍精度には 8 を使用します。デフォルトは 4 です。
  • vectorSearchIndex: 変更がストリーミングされるベクトル検索インデックス。形式は 'projects/{projectID}/locations/{region}/indexes/{indexID}' です(先頭または末尾にスペースを入れないでください)。例: projects/123/locations/us-east1/indexes/456
  • bigtableChangeStreamAppProfile: Bigtable アプリケーション プロファイル ID。アプリケーション プロファイルでは、単一クラスタ ルーティングを使用し、単一行のトランザクションを許可する必要があります。
  • bigtableReadInstanceId: ソース Bigtable インスタンス ID。
  • bigtableReadTableId: ソース Bigtable テーブル ID。

オプション パラメータ

  • bigtableMetadataTableTableId: メタデータ テーブルの作成に使用されるテーブル ID。
  • crowdingTagColumn: クラウディング タグが保存される列の完全修飾名。形式は cf:col です。
  • allowRestrictsMappings: allow 制限として使用する列のカンマ区切りの完全修飾名とそのエイリアス。形式は cf:col->alias です。
  • denyRestrictsMappings: deny 制限として使用する列のカンマ区切りの完全修飾名とそのエイリアス。形式は cf:col->alias です。
  • intNumericRestrictsMappings: 整数の numeric_restricts として使用する列のカンマ区切りの完全修飾名とそのエイリアス。形式は cf:col->alias です。
  • floatNumericRestrictsMappings: 浮動小数点数(4 バイト)の numeric_restricts として使用する列のカンマ区切りの完全修飾名とそのエイリアス。形式は cf:col->alias です。
  • doubleNumericRestrictsMappings: 倍精度(8 バイト)の numeric_restricts として使用する列のカンマ区切りの完全修飾名とそのエイリアス。形式は cf:col->alias です。
  • upsertMaxBatchSize: バッチをベクトル検索インデックスに upsert する前にバッファに格納する upsert の最大数。バッチは、upsertBatchSize レコードの準備が整ったとき、またはいずれかのレコードの upsertBatchDelay 待機時間が経過したときに送信されます。例: 10。デフォルトは 10 です。
  • upsertMaxBufferDuration: upsert のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、upsertBatchSize レコードの準備が整ったとき、またはいずれかのレコードの upsertBatchDelay 待機時間が経過したときに送信されます。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時間が Nh(例: 2h)です例: 10s。デフォルトは 10s です。
  • deleteMaxBatchSize: バッチをベクトル検索インデックスから削除する前にバッファに格納する削除の最大数。バッチは、deleteBatchSize レコードの準備が整ったとき、またはいずれかのレコードの deleteBatchDelay 待機時間が経過したときに送信されます。例: 10。デフォルトは 10 です。
  • deleteMaxBufferDuration: 削除のバッチがベクトル検索に送信されるまでの最大遅延。バッチは、deleteBatchSize レコードの準備が整ったとき、またはいずれかのレコードの deleteBatchDelay 待機時間が経過したときに送信されます。指定できる形式は、秒が Ns(例: 5s)、分が Nm(例: 12m)、時間が Nh(例: 2h)です例: 10s。デフォルトは 10s です。
  • dlqDirectory: 未処理のレコードを処理できなかった理由とともに保存するパス。デフォルトは、Dataflow ジョブの一時保存場所の下のディレクトリです。ほとんどの場合は、デフォルト値のまま使用できます。
  • bigtableChangeStreamMetadataInstanceId: Bigtable 変更ストリームのメタデータ インスタンス ID。デフォルトは空です。
  • bigtableChangeStreamMetadataTableTableId: Bigtable 変更ストリーム コネクタのメタデータ テーブル ID。指定しない場合、パイプライン実行中に Bigtable 変更ストリーム コネクタのメタデータ テーブルが自動的に作成されます。デフォルトは空です。
  • bigtableChangeStreamCharset: Bigtable 変更ストリームの文字セット名。デフォルトは UTF-8 です。
  • bigtableChangeStreamStartTimestamp: 変更ストリームの読み取りに使用される開始タイムスタンプ(https://tools.ietf.org/html/rfc3339)。例: 2022-05-05T07:59:59Z。デフォルトは、パイプラインの開始時間のタイムスタンプです。
  • bigtableChangeStreamIgnoreColumnFamilies: 変更を無視する列ファミリー名のカンマ区切りのリスト。デフォルトは空です。
  • bigtableChangeStreamIgnoreColumns: 変更を無視する列名のカンマ区切りのリスト。例: "cf1:col1,cf2:col2"。デフォルトは空です。
  • bigtableChangeStreamName: クライアント パイプラインの一意の名前。実行中のパイプラインが停止した時点から処理を再開できます。デフォルトは自動生成された名前です。使用される値については、Dataflow ジョブのログをご覧ください。
  • bigtableChangeStreamResume: true に設定すると、同じ bigtableChangeStreamName 値で実行中のパイプラインが停止した時点から、新しいパイプラインが処理を再開します。指定された bigtableChangeStreamName 値を持つパイプラインが一度も実行されていない場合、新しいパイプラインは開始されません。false に設定すると、新しいパイプラインが開始されます。特定のソースに対して同じ bigtableChangeStreamName 値を持つパイプラインがすでに実行されている場合、新しいパイプラインは開始されません。デフォルトは false です。
  • bigtableReadChangeStreamTimeoutMs: Bigtable ReadChangeStream リクエストのタイムアウト(ミリ秒)。
  • bigtableReadProjectId: Bigtable プロジェクト ID。デフォルトは Dataflow ジョブのプロジェクトです。

テンプレートを実行する

コンソール

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bigtable Change Streams to Vector Search template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

gcloud CLI

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • EMBEDDING_COLUMN: エンベディング列
  • EMBEDDING_BYTE_SIZE: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。
  • VECTOR_SEARCH_INDEX: ベクトル検索インデックスのパス
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable アプリケーション プロファイル ID
  • BIGTABLE_READ_INSTANCE_ID: ソース Bigtable インスタンス ID
  • BIGTABLE_READ_TABLE_ID: ソース Bigtable テーブル ID

API

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内の日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内の対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • EMBEDDING_COLUMN: エンベディング列
  • EMBEDDING_BYTE_SIZE: エンベディング配列のバイトサイズ。4 または 8 のいずれかです。
  • VECTOR_SEARCH_INDEX: ベクトル検索インデックスのパス
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: Bigtable アプリケーション プロファイル ID
  • BIGTABLE_READ_INSTANCE_ID: ソース Bigtable インスタンス ID
  • BIGTABLE_READ_TABLE_ID: ソース Bigtable テーブル ID