This template creates a streaming pipeline to stream Bigtable data change records and write them to Vertex AI Vector Search using Dataflow Runner V2.
Pipeline requirements
- The Bigtable source instance must exist.
- The Bigtable source table must exist, and the table must have change streams enabled.
- The Bigtable application profile must exist.
- The Vector Search index path must exist.
Template parameters
Required parameters
- embeddingColumn: The fully qualified column name where the embeddings are stored. In the format cf:col.
- embeddingByteSize: The byte size of each entry in the embeddings array. Use 4 for Float, and 8 for Double. Defaults to: 4.
- vectorSearchIndex: The Vector Search Index where changes will be streamed, in the format 'projects/{projectID}/locations/{region}/indexes/{indexID}' (no leading or trailing spaces) For example,
projects/123/locations/us-east1/indexes/456. - bigtableChangeStreamAppProfile: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
- bigtableReadInstanceId: The source Bigtable instance ID.
- bigtableReadTableId: The source Bigtable table ID.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Bigtable Change Streams to Vector Search template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud CLI
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/ \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Replace the following:
JOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME: the region where you want to deploy your Dataflow job—for example,us-central1EMBEDDING_COLUMN: the Embedding columnEMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID: the source Bigtable table ID
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/", "environment": { "maxWorkers": "10" } } }
Replace the following:
PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME: a unique job name of your choiceVERSION: the version of the template that you want to useYou can use the following values:
latestto use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION: the region where you want to deploy your Dataflow job—for example,us-central1EMBEDDING_COLUMN: the Embedding columnEMBEDDING_BYTE_SIZE: the The byte size of the embeddings array. Can be 4 or 8.VECTOR_SEARCH_INDEX: the Vector Search index PathBIGTABLE_CHANGE_STREAM_APP_PROFILE: the Bigtable application profile IDBIGTABLE_READ_INSTANCE_ID: the source Bigtable Instance IDBIGTABLE_READ_TABLE_ID: the source Bigtable table ID