This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.
Pipeline requirements
- The Bigtable source instance must exist.
- The Bigtable source table must exist, and the table must have change streams enabled.
- The Bigtable application profile must exist.
- The Vector Search index path must exist.
Template parameters
Required parameters
- embeddingColumn: The fully qualified column name where the embeddings are stored. In the format cf:col.
- embeddingByteSize: The byte size of each entry in the embeddings array. Use 4 for Float, and 8 for Double. Defaults to: 4.
- vectorSearchIndex: The Vector Search Index where changes will be streamed, in the format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces) For example, projects/123/locations/us-east1/indexes/456.
- bigtableChangeStreamAppProfile: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
- bigtableReadInstanceId: The source Bigtable instance ID.
- bigtableReadTableId: The source Bigtable table ID.
Optional parameters
- bigtableMetadataTableTableId: Table ID used for creating the metadata table.
- crowdingTagColumn: The fully qualified column name where the crowding tag is stored. In the format cf:col.
- allowRestrictsMappings: The comma separated fully qualified column names of the columns that should be used as the allowrestricts, with their alias. In the format cf:col->alias.
- denyRestrictsMappings: The comma separated fully qualified column names of the columns that should be used as the denyrestricts, with their alias. In the format cf:col->alias.
- intNumericRestrictsMappings: The comma separated fully qualified column names of the columns that should be used as integer numeric_restricts, with their alias. In the format cf:col->alias.
- floatNumericRestrictsMappings: The comma separated fully qualified column names of the columns that should be used as float (4 bytes) numeric_restricts, with their alias. In the format cf:col->alias.
- doubleNumericRestrictsMappings: The comma separated fully qualified column names of the columns that should be used as double (8 bytes) numeric_restricts, with their alias. In the format cf:col->alias.
- upsertMaxBatchSize: The maximum number of upserts to buffer before upserting the batch to the Vector Search Index. Batches will be sent when there are either upsertBatchSize records ready, or any record has been waiting upsertBatchDelay time has passed. For example, 10. Defaults to: 10.
- upsertMaxBufferDuration: The maximum delay before a batch of upserts is sent to Vector Search.Batches will be sent when there are either upsertBatchSize records ready, or any record has been waiting upsertBatchDelay time has passed. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). For example, 10s. Defaults to: 10s.
- deleteMaxBatchSize: The maximum number of deletes to buffer before deleting the batch from the Vector Search Index. Batches will be sent when there are either deleteBatchSize records ready, or any record has been waiting deleteBatchDelay time has passed. For example, 10. Defaults to: 10.
- deleteMaxBufferDuration: The maximum delay before a batch of deletes is sent to Vector Search.Batches will be sent when there are either deleteBatchSize records ready, or any record has been waiting deleteBatchDelay time has passed. Allowed formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh (for hours, example: 2h). For example, 10s. Defaults to: 10s.
- dlqDirectory: The path to store any unprocessed records with the reason they failed to be processed. Default is a directory under the Dataflow job's temp location. The default value is enough under most conditions.
- bigtableChangeStreamMetadataInstanceId: The Bigtable change streams metadata instance ID. Defaults to empty.
- bigtableChangeStreamMetadataTableTableId: The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty.
- bigtableChangeStreamCharset: The Bigtable change streams charset name. Defaults to: UTF-8.
- bigtableChangeStreamStartTimestamp: The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example, 2022-05-05T07:59:59Z. Defaults to the timestamp of the pipeline start time.
- bigtableChangeStreamIgnoreColumnFamilies: A comma-separated list of column family name changes to ignore. Defaults to empty.
- bigtableChangeStreamIgnoreColumns: A comma-separated list of column name changes to ignore. Example: "cf1:col1,cf2:col2". Defaults to empty.
- bigtableChangeStreamName: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
- bigtableChangeStreamResume: When set to true, a new pipeline resumes processing from the point at which a previously running pipeline with the samebigtableChangeStreamNamevalue stopped. If the pipeline with the givenbigtableChangeStreamNamevalue has never run, a new pipeline doesn't start. When set tofalse, a new pipeline starts. If a pipeline with the samebigtableChangeStreamNamevalue has already run for the given source, a new pipeline doesn't start. Defaults tofalse.
- bigtableReadChangeStreamTimeoutMs: The timeout for Bigtable ReadChangeStream requests in milliseconds.
- bigtableReadProjectId: The Bigtable project ID. The default is the project for the Dataflow job.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
    region is us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations. 
- From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud CLI
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Replace the following:
- JOB_NAME: a unique job name of your choice
- VERSION: the version of the template that you want to use- You can use the following values: - latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/
- the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
 
- REGION_NAME: the region where you want to deploy your Dataflow job—for example,- us-central1
- EMBEDDING_COLUMN: the Embedding column
- EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
- VECTOR_SEARCH_INDEX: the Vector Search index Path
- BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
- BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
- BIGTABLE_READ_TABLE_ID: the source Bigtable table ID
API
To run the template using the REST API, send an HTTP POST request. For more information on the
    API and its authorization scopes, see
  projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Replace the following:
- PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
- JOB_NAME: a unique job name of your choice
- VERSION: the version of the template that you want to use- You can use the following values: - latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/
- the version name, like 2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
 
- LOCATION: the region where you want to deploy your Dataflow job—for example,- us-central1
- EMBEDDING_COLUMN: the Embedding column
- EMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.
- VECTOR_SEARCH_INDEX: the Vector Search index Path
- BIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile ID
- BIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance ID
- BIGTABLE_READ_TABLE_ID: the source Bigtable table ID