Este modelo cria um pipeline de streaming para transmitir registos de alterações de dados do Bigtable e escrevê-los na pesquisa vetorial da Vertex AI através do Dataflow Runner V2.
Requisitos do pipeline
- A instância de origem do Bigtable tem de existir.
- A tabela de origem do Bigtable tem de existir e a tabela tem de ter streams de alterações ativadas.
- O perfil da aplicação Bigtable tem de existir.
- O caminho do índice do Vector Search tem de existir.
Parâmetros de modelos
Parâmetros obrigatórios
- embeddingColumn: o nome da coluna totalmente qualificado onde os encaixes estão armazenados. No formato cf:col.
- embeddingByteSize: o tamanho em bytes de cada entrada na matriz de incorporações. Use 4 para Float e 8 para Double. A predefinição é: 4.
- vectorSearchIndex: o índice do Vector Search onde as alterações vão ser transmitidas em fluxo, no formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sem espaços no início nem no fim). Por exemplo,
projects/123/locations/us-east1/indexes/456. - bigtableChangeStreamAppProfile: o ID do perfil da aplicação do Bigtable. O perfil da aplicação tem de usar o encaminhamento de cluster único e permitir transações de linha única.
- bigtableReadInstanceId: o ID da instância do Bigtable de origem.
- bigtableReadTableId: o ID da tabela do Bigtable de origem.
Parâmetros opcionais
- bigtableMetadataTableTableId: ID da tabela usado para criar a tabela de metadados.
- crowdingTagColumn: o nome da coluna totalmente qualificado onde a etiqueta de sobreposição está armazenada. No formato cf:col.
- allowRestrictsMappings: os nomes de colunas totalmente qualificados separados por vírgulas das colunas que devem ser usadas como
allowrestrições, com o respetivo alias. No formato cf:col->alias. - denyRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usadas como
denyrestrições, com o respetivo alias. No formato cf:col->alias. - intNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como número inteiro
numeric_restricts, com o respetivo alias. No formato cf:col->alias. - floatNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como float (4 bytes)
numeric_restricts, com o respetivo alias. No formato cf:col->alias. - doubleNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como duplos (8 bytes)
numeric_restricts, com o respetivo alias. No formato cf:col->alias. - upsertMaxBatchSize: o número máximo de inserções/atualizações a armazenar em buffer antes de inserir/atualizar o lote no índice do Vector Search. Os lotes são enviados quando existem registos upsertBatchSize prontos ou quando passou o tempo upsertBatchDelay de espera de qualquer registo. Por exemplo,
10. A predefinição é: 10. - upsertMaxBufferDuration: o atraso máximo antes de um lote de inserções/atualizações ser enviado para a pesquisa vetorial.Os lotes são enviados quando existem registos upsertBatchSize prontos ou quando qualquer registo está à espera há upsertBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo,
10s. A predefinição é: 10 s. - deleteMaxBatchSize: o número máximo de eliminações a colocar em buffer antes de eliminar o lote do índice de pesquisa vetorial. Os lotes são enviados quando existem registos deleteBatchSize prontos ou quando o tempo de espera deleteBatchDelay de qualquer registo tiver passado. Por exemplo,
10. A predefinição é: 10. - deleteMaxBufferDuration: o atraso máximo antes de um lote de eliminações ser enviado para a pesquisa vetorial.Os lotes são enviados quando existem registos deleteBatchSize prontos ou quando qualquer registo está à espera há deleteBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo,
10s. A predefinição é: 10 s. - dlqDirectory: o caminho para armazenar todos os registos não processados com o motivo pelo qual não foram processados. A predefinição é um diretório na localização temporária da tarefa do Dataflow. O valor predefinido é suficiente na maioria das condições.
- bigtableChangeStreamMetadataInstanceId: o ID da instância de metadados das streams de alterações do Bigtable. A predefinição é vazio.
- bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conetor de streams de alterações do Bigtable. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Bigtable durante a execução do pipeline. A predefinição é vazio.
- bigtableChangeStreamCharset: o nome do conjunto de carateres das streams de alterações do Bigtable. A predefinição é: UTF-8.
- bigtableChangeStreamStartTimestamp: a data/hora de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo,
2022-05-05T07:59:59Z. O valor predefinido é a data/hora de início do pipeline. - bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas de alterações de nomes de famílias de colunas a ignorar. A predefinição é vazio.
- bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas de alterações de nomes de colunas a ignorar. Exemplo: "cf1:col1,cf2:col2". A predefinição é vazio.
- bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite-lhe retomar o processamento a partir do ponto em que um pipeline em execução anterior parou. A predefinição é um nome gerado automaticamente. Consulte os registos da tarefa do Dataflow para ver o valor usado.
- bigtableChangeStreamResume: quando definido como
true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline executado anteriormente com o mesmo valorbigtableChangeStreamNameparou. Se o pipeline com o valorbigtableChangeStreamNameespecificado nunca tiver sido executado, não é iniciado um novo pipeline. Quando definido comofalse, é iniciado um novo pipeline. Se já tiver sido executado um pipeline com o mesmo valor debigtableChangeStreamNamepara a origem especificada, não é iniciado um novo pipeline. A predefinição éfalse. - bigtableReadChangeStreamTimeoutMs: o limite de tempo para pedidos Bigtable ReadChangeStream em milissegundos.
- bigtableReadProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.
Execute o modelo
Consola
- Aceda à página Dataflow Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Bigtable Change Streams to Vector Search template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
CLI gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Substitua o seguinte:
JOB_NAME: um nome de tarefa exclusivo à sua escolhaVERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1EMBEDDING_COLUMN: a coluna IncorporaçãoEMBEDDING_BYTE_SIZE: o tamanho em bytes da matriz de incorporações. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX: o caminho do índice do Vector SearchBIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil da aplicação do BigtableBIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID: o ID da tabela do Bigtable de origem
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaVERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1EMBEDDING_COLUMN: a coluna EmbeddingEMBEDDING_BYTE_SIZE: o tamanho em bytes da matriz de incorporações. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX: o caminho do índice do Vector SearchBIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil da aplicação do BigtableBIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID: o ID da tabela do Bigtable de origem