Modelo do Pub/Sub para streams de alterações do Bigtable

O modelo de streams de alterações do Bigtable para o Pub/Sub é um pipeline de streaming que transmite registos de alterações de dados do Bigtable e os publica num tópico do Pub/Sub através do Dataflow.

Um fluxo de alterações do Bigtable permite-lhe subscrever mutações de dados por tabela. Quando subscreve streams de alterações de tabelas, aplicam-se as seguintes restrições:

  • Apenas são devolvidas as células modificadas e os descritores das operações de eliminação.
  • Apenas é devolvido o novo valor de uma célula modificada.

Quando os registos de alterações de dados são publicados num tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordenação original da data/hora de confirmação do Bigtable.

Os registos de alterações de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são temporariamente colocados num diretório de fila de mensagens rejeitadas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de novas tentativas sem êxito, estes registos são colocados indefinidamente no mesmo diretório da fila de mensagens rejeitadas para revisão humana ou processamento adicional pelo utilizador.

O pipeline requer que o tópico Pub/Sub de destino exista. O tópico de destino pode estar configurado para validar mensagens através de um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só é iniciado se o esquema for válido. Consoante o tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:

Buffers de protocolo

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Use o seguinte esquema Protobuf com a codificação de mensagens JSON:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Cada nova mensagem do Pub/Sub inclui uma entrada de um registo de alteração de dados devolvido pelo fluxo de alterações da respetiva linha na sua tabela do Bigtable. O modelo do Pub/Sub reduz as entradas em cada registo de alteração de dados a alterações individuais ao nível da célula.

Descrição da mensagem de saída do Pub/Sub

Nome do campo Descrição
rowKey A chave da linha alterada. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, as chaves de linhas são devolvidas como strings. Quando useBase64Rowkeys é especificado, as chaves de linhas são codificadas em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de chaves de linhas numa string.
modType O tipo de mutação de linhas. Use um dos seguintes valores: SET_CELL, DELETE_CELLS ou DELETE_FAMILY.
columnFamily A família de colunas afetada pela mutação de linhas.
column O qualificador da coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY, o campo da coluna não está definido. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, as colunas são devolvidas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de chaves de linhas numa string.
commitTimestamp A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC).
timestamp O valor da data/hora da célula afetada pela mutação. Para os tipos de mutação DELETE_CELLS e DELETE_FAMILY, a data/hora não está definida. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC).
timestampFrom Descreve um início inclusivo do intervalo de indicações de tempo para todas as células eliminadas pela mutação DELETE_CELLS. Para outros tipos de mutações, timestampFrom não está definido. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC).
timestampTo Descreve um fim exclusivo do intervalo de data/hora para todas as células eliminadas pela mutação DELETE_CELLS. Para outros tipos de mutações, timestampTo não está definido.
isGC Um valor booleano que indica se a mutação é gerada por um mecanismo de recolha de lixo do Bigtable.
tieBreaker Quando duas mutações são registadas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o valor tiebreaker mais elevado é aplicada à tabela de origem. As mutações com valores de tiebreaker mais baixos são rejeitadas.
value O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor é definido para mutações SET_CELL. Para outros tipos de mutações, o valor não é definido. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, os valores são devolvidos como strings. Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de valor numa string.
sourceInstance O nome da instância do Bigtable que registou a mutação. Pode ocorrer quando vários pipelines transmitem alterações de diferentes instâncias para o mesmo tópico do Pub/Sub.
sourceCluster O nome do cluster do Bigtable que registou a mutação. Pode ser usado quando vários pipelines transmitem alterações de diferentes instâncias para o mesmo tópico do Pub/Sub.
sourceTable O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso em que várias condutas transmitem alterações de tabelas diferentes para o mesmo tópico do Pub/Sub.

Requisitos do pipeline

  • A instância de origem do Bigtable especificada.
  • A tabela de origem do Bigtable especificada. A tabela tem de ter streams de alterações ativadas.
  • O perfil da aplicação do Bigtable especificado.
  • O tópico Pub/Sub especificado tem de existir.

Parâmetros de modelos

Parâmetros obrigatórios

  • pubSubTopic: o nome do tópico do Pub/Sub de destino.
  • bigtableChangeStreamAppProfile: o ID do perfil da aplicação do Bigtable. O perfil da aplicação tem de usar o encaminhamento de cluster único e permitir transações de linha única.
  • bigtableReadInstanceId: o ID da instância do Bigtable de origem.
  • bigtableReadTableId: o ID da tabela do Bigtable de origem.

Parâmetros opcionais

  • messageEncoding: a codificação das mensagens a publicar no tópico Pub/Sub. Quando o esquema do tópico de destino está configurado, a codificação de mensagens é determinada pelas definições do tópico. Os seguintes valores são suportados: BINARY e JSON. A predefinição é JSON.
  • messageFormat: a codificação das mensagens a publicar no tópico Pub/Sub. Quando o esquema do tópico de destino está configurado, a codificação de mensagens é determinada pelas definições do tópico. Os seguintes valores são suportados: AVRO, PROTOCOL_BUFFERS e JSON. O valor predefinido é JSON. Quando é usado o formato JSON, os campos rowKey, coluna e valor da mensagem são strings, cujo conteúdo é determinado pelas opções da pipeline useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values e bigtableChangeStreamCharset.
  • stripValues: quando definido como true, as mutações SET_CELL são devolvidas sem novos valores definidos. A predefinição é false. Este parâmetro é útil quando não precisa de um novo valor presente, também conhecido como invalidação da cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho das mensagens do Pub/Sub.
  • dlqDirectory: o diretório da fila de mensagens rejeitadas. Os registos que não são processados são armazenados neste diretório. A predefinição é um diretório na localização temporária do trabalho do Dataflow. Na maioria dos casos, pode usar o caminho predefinido.
  • dlqRetryMinutes: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. A predefinição é 10.
  • dlqMaxRetries: o número máximo de repetições de mensagens não entregues. A predefinição é 5.
  • useBase64Rowkeys: usado com a codificação de mensagens JSON. Quando definido como true, o campo rowKey é uma string codificada em Base64. Caso contrário, o rowKey é produzido através da utilização de bigtableChangeStreamCharset para descodificar bytes numa string. A predefinição é false.
  • pubSubProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.
  • useBase64ColumnQualifiers: usado com a codificação de mensagens JSON. Quando definido como true, o campo column é uma string codificada em Base64. Caso contrário, a coluna é produzida através de bigtableChangeStreamCharset para descodificar bytes numa string. A predefinição é false.
  • useBase64Values: usado com a codificação de mensagens JSON. Quando definido como true, o campo de valor é uma string codificada em Base64. Caso contrário, o valor é produzido através de bigtableChangeStreamCharset para descodificar bytes numa string. A predefinição é false.
  • disableDlqRetries: indica se as repetições para a DLQ devem ou não ser desativadas. A predefinição é: false.
  • bigtableChangeStreamMetadataInstanceId: o ID da instância de metadados das streams de alterações do Bigtable. A predefinição é vazio.
  • bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conetor de streams de alterações do Bigtable. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Bigtable durante a execução do pipeline. A predefinição é vazio.
  • bigtableChangeStreamCharset: o nome do conjunto de carateres das streams de alterações do Bigtable. A predefinição é: UTF-8.
  • bigtableChangeStreamStartTimestamp: a data/hora de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo, 2022-05-05T07:59:59Z. O valor predefinido é a data/hora de início do pipeline.
  • bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas de alterações de nomes de famílias de colunas a ignorar. A predefinição é vazio.
  • bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas de alterações de nomes de colunas a ignorar. Exemplo: "cf1:col1,cf2:col2". A predefinição é vazio.
  • bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite-lhe retomar o processamento a partir do ponto em que um pipeline em execução anterior parou. A predefinição é um nome gerado automaticamente. Consulte os registos da tarefa do Dataflow para ver o valor usado.
  • bigtableChangeStreamResume: quando definido como true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline executado anteriormente com o mesmo valor bigtableChangeStreamName parou. Se o pipeline com o valor bigtableChangeStreamName especificado nunca tiver sido executado, não é iniciado um novo pipeline. Quando definido como false, é iniciado um novo pipeline. Se já tiver sido executado um pipeline com o mesmo valor de bigtableChangeStreamName para a origem especificada, não é iniciado um novo pipeline. A predefinição é false.
  • bigtableReadChangeStreamTimeoutMs: o limite de tempo para pedidos Bigtable ReadChangeStream em milissegundos.
  • bigtableReadProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.

Execute o modelo

Consola

  1. Aceda à página Dataflow Criar tarefa a partir de um modelo.
  2. Aceda a Criar tarefa a partir de modelo
  3. No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
  4. Opcional: para Ponto final regional, selecione um valor no menu pendente. A região predefinida é us-central1.

    Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.

  5. No menu pendente Modelo do fluxo de dados, selecione the Bigtable change streams to Pub/Sub template.
  6. Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
  7. Clique em Executar tarefa.

gcloud

Na shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: o ID do perfil da aplicação do Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

API

Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google Cloud
  • JOB_NAME: um nome de tarefa exclusivo à sua escolha
  • VERSION: a versão do modelo que quer usar

    Pode usar os seguintes valores:

  • LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo, us-central1
  • BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.
  • BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.
  • BIGTABLE_APPLICATION_PROFILE_ID: o ID do perfil da aplicação do Bigtable.
  • PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub

O que se segue?