Bigtable change streams to Vector Search template

This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.

Pipeline requirements

  • The Bigtable source instance must exist.
  • The Bigtable source table must exist, and the table must have change streams enabled.
  • The Bigtable application profile must exist.
  • The Vector Search index path must exist.

Template parameters

Required parameters

  • embeddingColumn: The fully qualified column name where the embeddings are stored. In the format cf:col.
  • embeddingByteSize: The byte size of each entry in the embeddings array. Use 4 for Float, and 8 for Double. Defaults to: 4.
  • vectorSearchIndex: The Vector Search Index where changes will be streamed, in the format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces) For example, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
  • bigtableReadInstanceId: The source Bigtable instance ID.
  • bigtableReadTableId: The source Bigtable table ID.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud CLI

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Replace the following:

  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • EMBEDDING_COLUMN: the Embedding column
  • EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
  • VECTOR_SEARCH_INDEX: the Vector Search index Path
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
  • BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
  • BIGTABLE_READ_TABLE_ID: the source Bigtable table ID

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/",
     "environment": { "maxWorkers": "10" }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • EMBEDDING_COLUMN: the Embedding column
  • EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
  • VECTOR_SEARCH_INDEX: the Vector Search index Path
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
  • BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
  • BIGTABLE_READ_TABLE_ID: the source Bigtable table ID