Streams de alterações do Bigtable para o modelo de pesquisa vetorial

Este modelo cria um pipeline de streaming para transmitir registos de alterações de dados do Bigtable e escrevê-los na pesquisa vetorial da Vertex AI através do Dataflow Runner V2.

Requisitos do pipeline

  • A instância de origem do Bigtable tem de existir.
  • A tabela de origem do Bigtable tem de existir e a tabela tem de ter streams de alterações ativadas.
  • O perfil da aplicação Bigtable tem de existir.
  • O caminho do índice do Vector Search tem de existir.

Parâmetros de modelos

Parâmetros obrigatórios

  • embeddingColumn: o nome da coluna totalmente qualificado onde os encaixes estão armazenados. No formato cf:col.
  • embeddingByteSize: o tamanho em bytes de cada entrada na matriz de incorporações. Use 4 para Float e 8 para Double. A predefinição é: 4.
  • vectorSearchIndex: o índice do Vector Search onde as alterações vão ser transmitidas em fluxo, no formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sem espaços no início nem no fim). Por exemplo, projects/123/locations/us-east1/indexes/456.
  • bigtableChangeStreamAppProfile: o ID do perfil da aplicação do Bigtable. O perfil da aplicação tem de usar o encaminhamento de cluster único e permitir transações de linha única.
  • bigtableReadInstanceId: o ID da instância do Bigtable de origem.
  • bigtableReadTableId: o ID da tabela do Bigtable de origem.

Parâmetros opcionais

  • bigtableMetadataTableTableId: ID da tabela usado para criar a tabela de metadados.
  • crowdingTagColumn: o nome da coluna totalmente qualificado onde a etiqueta de sobreposição está armazenada. No formato cf:col.
  • allowRestrictsMappings: os nomes de colunas totalmente qualificados separados por vírgulas das colunas que devem ser usadas como allow restrições, com o respetivo alias. No formato cf:col->alias.
  • denyRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usadas como deny restrições, com o respetivo alias. No formato cf:col->alias.
  • intNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como número inteiro numeric_restricts, com o respetivo alias. No formato cf:col->alias.
  • floatNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como float (4 bytes) numeric_restricts, com o respetivo alias. No formato cf:col->alias.
  • doubleNumericRestrictsMappings: os nomes das colunas totalmente qualificados separados por vírgulas das colunas que devem ser usados como duplos (8 bytes) numeric_restricts, com o respetivo alias. No formato cf:col->alias.
  • upsertMaxBatchSize: o número máximo de inserções/atualizações a armazenar em buffer antes de inserir/atualizar o lote no índice do Vector Search. Os lotes são enviados quando existem registos upsertBatchSize prontos ou quando passou o tempo upsertBatchDelay de espera de qualquer registo. Por exemplo, 10. A predefinição é: 10.
  • upsertMaxBufferDuration: o atraso máximo antes de um lote de inserções/atualizações ser enviado para a pesquisa vetorial.Os lotes são enviados quando existem registos upsertBatchSize prontos ou quando qualquer registo está à espera há upsertBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo, 10s. A predefinição é: 10 s.
  • deleteMaxBatchSize: o número máximo de eliminações a colocar em buffer antes de eliminar o lote do índice de pesquisa vetorial. Os lotes são enviados quando existem registos deleteBatchSize prontos ou quando o tempo de espera deleteBatchDelay de qualquer registo tiver passado. Por exemplo, 10. A predefinição é: 10.
  • deleteMaxBufferDuration: o atraso máximo antes de um lote de eliminações ser enviado para a pesquisa vetorial.Os lotes são enviados quando existem registos deleteBatchSize prontos ou quando qualquer registo está à espera há deleteBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: 5s), Nm (para minutos, exemplo: 12m), Nh (para horas, exemplo: 2h). Por exemplo, 10s. A predefinição é: 10 s.
  • dlqDirectory: o caminho para armazenar todos os registos não processados com o motivo pelo qual não foram processados. A predefinição é um diretório na localização temporária da tarefa do Dataflow. O valor predefinido é suficiente na maioria das condições.
  • bigtableChangeStreamMetadataInstanceId: o ID da instância de metadados das streams de alterações do Bigtable. A predefinição é vazio.
  • bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conetor de streams de alterações do Bigtable. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Bigtable durante a execução do pipeline. A predefinição é vazio.
  • bigtableChangeStreamCharset: o nome do conjunto de carateres das streams de alterações do Bigtable. A predefinição é: UTF-8.
  • bigtableChangeStreamStartTimestamp: a data/hora de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo, 2022-05-05T07:59:59Z. O valor predefinido é a data/hora de início do pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas de alterações de nomes de famílias de colunas a ignorar. A predefinição é vazio.
  • bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas de alterações de nomes de colunas a ignorar. Exemplo: "cf1:col1,cf2:col2". A predefinição é vazio.
  • bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite-lhe retomar o processamento a partir do ponto em que um pipeline em execução anterior parou. A predefinição é um nome gerado automaticamente. Consulte os registos da tarefa do Dataflow para ver o valor usado.
  • bigtableChangeStreamResume: quando definido como true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline executado anteriormente com o mesmo valor bigtableChangeStreamName parou. Se o pipeline com o valor bigtableChangeStreamName especificado nunca tiver sido executado, não é iniciado um novo pipeline. Quando definido como false, é iniciado um novo pipeline. Se já tiver sido executado um pipeline com o mesmo valor de bigtableChangeStreamName para a origem especificada, não é iniciado um novo pipeline. A predefinição é false.
  • bigtableReadChangeStreamTimeoutMs: o limite de tempo para pedidos Bigtable ReadChangeStream em milissegundos.
  • bigtableReadProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Bigtable Change Streams to Vector Search template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

CLI gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Substitua o seguinte:

  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • EMBEDDING_COLUMN: a coluna Incorporação
  • EMBEDDING_BYTE_SIZE: o tamanho em bytes da matriz de incorporações. Pode ser 4 ou 8.
  • VECTOR_SEARCH_INDEX: o caminho do índice do Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil da aplicação do Bigtable
  • BIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origem
  • BIGTABLE_READ_TABLE_ID: o ID da tabela do Bigtable de origem

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • EMBEDDING_COLUMN: a coluna Embedding
  • EMBEDDING_BYTE_SIZE: o tamanho em bytes da matriz de incorporações. Pode ser 4 ou 8.
  • VECTOR_SEARCH_INDEX: o caminho do índice do Vector Search
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil da aplicação do Bigtable
  • BIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origem
  • BIGTABLE_READ_TABLE_ID: o ID da tabela do Bigtable de origem