O modelo de streams de alterações do Bigtable para o Pub/Sub é um pipeline de streaming que transmite registos de alterações de dados do Bigtable e os publica num tópico do Pub/Sub através do Dataflow.
Um fluxo de alterações do Bigtable permite-lhe subscrever mutações de dados por tabela. Quando subscreve streams de alterações de tabelas, aplicam-se as seguintes restrições:
- Apenas são devolvidas as células modificadas e os descritores das operações de eliminação.
- Apenas é devolvido o novo valor de uma célula modificada.
Quando os registos de alterações de dados são publicados num tópico do Pub/Sub, as mensagens podem ser inseridas fora de ordem em comparação com a ordenação original da data/hora de confirmação do Bigtable.
Os registos de alterações de dados do Bigtable que não podem ser publicados em tópicos do Pub/Sub são temporariamente colocados num diretório de fila de mensagens rejeitadas (fila de mensagens não processadas) no Cloud Storage. Após o número máximo de novas tentativas sem êxito, estes registos são colocados indefinidamente no mesmo diretório da fila de mensagens rejeitadas para revisão humana ou processamento adicional pelo utilizador.
O pipeline requer que o tópico Pub/Sub de destino exista. O tópico de destino pode estar configurado para validar mensagens através de um esquema. Quando um tópico do Pub/Sub especifica um esquema, o pipeline só é iniciado se o esquema for válido. Consoante o tipo de esquema, use uma das seguintes definições de esquema para o tópico de destino:
Buffers de protocolo
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangeLogEntryProto"; message ChangelogEntryProto{ required bytes rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional bytes column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional bytes value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Avro
{ "name" : "ChangelogEntryMessage", "type" : "record", "namespace" : "com.google.cloud.teleport.bigtable", "fields" : [ { "name" : "rowKey", "type" : "bytes"}, { "name" : "modType", "type" : { "name": "ModType", "type": "enum", "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]} }, { "name": "isGC", "type": "boolean" }, { "name": "tieBreaker", "type": "int"}, { "name": "columnFamily", "type": "string"}, { "name": "commitTimestamp", "type" : "long"}, { "name" : "sourceInstance", "type" : "string"}, { "name" : "sourceCluster", "type" : "string"}, { "name" : "sourceTable", "type" : "string"}, { "name": "column", "type" : ["null", "bytes"]}, { "name": "timestamp", "type" : ["null", "long"]}, { "name": "timestampFrom", "type" : ["null", "long"]}, { "name": "timestampTo", "type" : ["null", "long"]}, { "name" : "value", "type" : ["null", "bytes"]} ] }
JSON
Use o seguinte esquema Protobuf com a codificação de mensagens JSON:
syntax = "proto2"; package com.google.cloud.teleport.bigtable; option java_outer_classname = "ChangelogEntryMessageText"; message ChangelogEntryText{ required string rowKey = 1; enum ModType { SET_CELL = 0; DELETE_FAMILY = 1; DELETE_CELLS = 2; UNKNOWN = 3; } required ModType modType = 2; required bool isGC = 3; required int32 tieBreaker = 4; required int64 commitTimestamp = 5; required string columnFamily = 6; optional string column = 7; optional int64 timestamp = 8; optional int64 timestampFrom = 9; optional int64 timestampTo = 10; optional string value = 11; required string sourceInstance = 12; required string sourceCluster = 13; required string sourceTable = 14; }
Cada nova mensagem do Pub/Sub inclui uma entrada de um registo de alteração de dados devolvido pelo fluxo de alterações da respetiva linha na sua tabela do Bigtable. O modelo do Pub/Sub reduz as entradas em cada registo de alteração de dados a alterações individuais ao nível da célula.
Descrição da mensagem de saída do Pub/Sub
| Nome do campo | Descrição |
|---|---|
rowKey |
A chave da linha alterada. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, as chaves de linhas são devolvidas como strings. Quando useBase64Rowkeys é especificado, as chaves de linhas são codificadas em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de chaves de linhas numa string. |
modType |
O tipo de mutação de linhas. Use um dos seguintes valores: SET_CELL, DELETE_CELLS ou DELETE_FAMILY. |
columnFamily |
A família de colunas afetada pela mutação de linhas. |
column |
O qualificador da coluna afetado pela mutação da linha. Para o tipo de mutação DELETE_FAMILY, o campo da coluna não está definido. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, as colunas são devolvidas como strings. Quando useBase64ColumnQualifier é especificado, o campo da coluna é codificado em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de chaves de linhas numa string. |
commitTimestamp |
A hora em que o Bigtable aplica a mutação. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC). |
timestamp |
O valor da data/hora da célula afetada pela mutação. Para os tipos de mutação DELETE_CELLS e DELETE_FAMILY, a data/hora não está definida. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC). |
timestampFrom |
Descreve um início inclusivo do intervalo de indicações de tempo para todas as células eliminadas pela mutação DELETE_CELLS. Para outros tipos de mutações, timestampFrom não está definido. A hora é medida em microssegundos desde a época Unix (1 de janeiro de 1970 às UTC). |
timestampTo |
Descreve um fim exclusivo do intervalo de data/hora para todas as células eliminadas pela mutação DELETE_CELLS. Para outros tipos de mutações, timestampTo não está definido. |
isGC |
Um valor booleano que indica se a mutação é gerada por um mecanismo de recolha de lixo do Bigtable. |
tieBreaker |
Quando duas mutações são registadas ao mesmo tempo por diferentes clusters do Bigtable, a mutação com o valor tiebreaker mais elevado é aplicada à tabela de origem. As mutações com valores de tiebreaker mais baixos são rejeitadas. |
value |
O novo valor definido pela mutação. A menos que a opção de pipeline stripValues esteja definida, o valor é definido para mutações SET_CELL. Para outros tipos de mutações, o valor não é definido. Chega sob a forma de uma matriz de bytes. Quando a codificação de mensagens JSON está configurada, os valores são devolvidos como strings.
Quando useBase64Values é especificado, o valor é codificado em Base64. Caso contrário, é usado um conjunto de carateres especificado por bigtableChangeStreamCharset para descodificar bytes de valor numa string. |
sourceInstance |
O nome da instância do Bigtable que registou a mutação. Pode ocorrer quando vários pipelines transmitem alterações de diferentes instâncias para o mesmo tópico do Pub/Sub. |
sourceCluster |
O nome do cluster do Bigtable que registou a mutação. Pode ser usado quando vários pipelines transmitem alterações de diferentes instâncias para o mesmo tópico do Pub/Sub. |
sourceTable |
O nome da tabela do Bigtable que recebeu a mutação. Pode ser usado no caso em que várias condutas transmitem alterações de tabelas diferentes para o mesmo tópico do Pub/Sub. |
Requisitos do pipeline
- A instância de origem do Bigtable especificada.
- A tabela de origem do Bigtable especificada. A tabela tem de ter streams de alterações ativadas.
- O perfil da aplicação do Bigtable especificado.
- O tópico Pub/Sub especificado tem de existir.
Parâmetros de modelos
Parâmetros obrigatórios
- pubSubTopic: o nome do tópico do Pub/Sub de destino.
- bigtableChangeStreamAppProfile: o ID do perfil da aplicação do Bigtable. O perfil da aplicação tem de usar o encaminhamento de cluster único e permitir transações de linha única.
- bigtableReadInstanceId: o ID da instância do Bigtable de origem.
- bigtableReadTableId: o ID da tabela do Bigtable de origem.
Parâmetros opcionais
- messageEncoding: a codificação das mensagens a publicar no tópico Pub/Sub. Quando o esquema do tópico de destino está configurado, a codificação de mensagens é determinada pelas definições do tópico. Os seguintes valores são suportados:
BINARYeJSON. A predefinição éJSON. - messageFormat: a codificação das mensagens a publicar no tópico Pub/Sub. Quando o esquema do tópico de destino está configurado, a codificação de mensagens é determinada pelas definições do tópico. Os seguintes valores são suportados:
AVRO,PROTOCOL_BUFFERSeJSON. O valor predefinido éJSON. Quando é usado o formatoJSON, os campos rowKey, coluna e valor da mensagem são strings, cujo conteúdo é determinado pelas opções da pipelineuseBase64Rowkeys,useBase64ColumnQualifiers,useBase64ValuesebigtableChangeStreamCharset. - stripValues: quando definido como
true, as mutaçõesSET_CELLsão devolvidas sem novos valores definidos. A predefinição éfalse. Este parâmetro é útil quando não precisa de um novo valor presente, também conhecido como invalidação da cache, ou quando os valores são extremamente grandes e excedem os limites de tamanho das mensagens do Pub/Sub. - dlqDirectory: o diretório da fila de mensagens rejeitadas. Os registos que não são processados são armazenados neste diretório. A predefinição é um diretório na localização temporária do trabalho do Dataflow. Na maioria dos casos, pode usar o caminho predefinido.
- dlqRetryMinutes: o número de minutos entre as novas tentativas da fila de mensagens rejeitadas. A predefinição é
10. - dlqMaxRetries: o número máximo de repetições de mensagens não entregues. A predefinição é
5. - useBase64Rowkeys: usado com a codificação de mensagens JSON. Quando definido como
true, o camporowKeyé uma string codificada em Base64. Caso contrário, orowKeyé produzido através da utilização debigtableChangeStreamCharsetpara descodificar bytes numa string. A predefinição éfalse. - pubSubProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.
- useBase64ColumnQualifiers: usado com a codificação de mensagens JSON. Quando definido como
true, o campocolumné uma string codificada em Base64. Caso contrário, a coluna é produzida através debigtableChangeStreamCharsetpara descodificar bytes numa string. A predefinição éfalse. - useBase64Values: usado com a codificação de mensagens JSON. Quando definido como
true, o campo de valor é uma string codificada em Base64. Caso contrário, o valor é produzido através debigtableChangeStreamCharsetpara descodificar bytes numa string. A predefinição éfalse. - disableDlqRetries: indica se as repetições para a DLQ devem ou não ser desativadas. A predefinição é: false.
- bigtableChangeStreamMetadataInstanceId: o ID da instância de metadados das streams de alterações do Bigtable. A predefinição é vazio.
- bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conetor de streams de alterações do Bigtable. Se não for fornecida, é criada automaticamente uma tabela de metadados do conector de streams de alterações do Bigtable durante a execução do pipeline. A predefinição é vazio.
- bigtableChangeStreamCharset: o nome do conjunto de carateres das streams de alterações do Bigtable. A predefinição é: UTF-8.
- bigtableChangeStreamStartTimestamp: a data/hora de início (https://tools.ietf.org/html/rfc3339), inclusive, a usar para ler streams de alterações. Por exemplo,
2022-05-05T07:59:59Z. O valor predefinido é a data/hora de início do pipeline. - bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas de alterações de nomes de famílias de colunas a ignorar. A predefinição é vazio.
- bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas de alterações de nomes de colunas a ignorar. Exemplo: "cf1:col1,cf2:col2". A predefinição é vazio.
- bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite-lhe retomar o processamento a partir do ponto em que um pipeline em execução anterior parou. A predefinição é um nome gerado automaticamente. Consulte os registos da tarefa do Dataflow para ver o valor usado.
- bigtableChangeStreamResume: quando definido como
true, um novo pipeline retoma o processamento a partir do ponto em que um pipeline executado anteriormente com o mesmo valorbigtableChangeStreamNameparou. Se o pipeline com o valorbigtableChangeStreamNameespecificado nunca tiver sido executado, não é iniciado um novo pipeline. Quando definido comofalse, é iniciado um novo pipeline. Se já tiver sido executado um pipeline com o mesmo valor debigtableChangeStreamNamepara a origem especificada, não é iniciado um novo pipeline. A predefinição éfalse. - bigtableReadChangeStreamTimeoutMs: o limite de tempo para pedidos Bigtable ReadChangeStream em milissegundos.
- bigtableReadProjectId: o ID do projeto do Bigtable. A predefinição é o projeto da tarefa do Dataflow.
Execute o modelo
Consola
- Aceda à página Dataflow Criar tarefa a partir de um modelo. Aceda a Criar tarefa a partir de modelo
- No campo Nome da tarefa, introduza um nome exclusivo para a tarefa.
- Opcional: para Ponto final regional, selecione um valor no menu pendente. A região
predefinida é
us-central1.Para ver uma lista das regiões onde pode executar uma tarefa do Dataflow, consulte o artigo Localizações do Dataflow.
- No menu pendente Modelo do fluxo de dados, selecione the Bigtable change streams to Pub/Sub template.
- Nos campos de parâmetros fornecidos, introduza os valores dos parâmetros.
- Clique em Executar tarefa.
gcloud
Na shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \ --parameters \ bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_TABLE_ID,\ bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\ pubSubTopic=PUBSUB_TOPIC
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaVERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
REGION_NAME: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID: o ID do perfil da aplicação do Bigtable.PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub
API
Para executar o modelo através da API REST, envie um pedido HTTP POST. Para mais informações sobre a API e os respetivos âmbitos de autorização, consulte projects.templates.launch.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub", "parameters": { "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_TABLE_ID", "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID", "pubSubTopic": "PUBSUB_TOPIC" } } }
Substitua o seguinte:
PROJECT_ID: o ID do projeto onde quer executar a tarefa do Dataflow Google CloudJOB_NAME: um nome de tarefa exclusivo à sua escolhaVERSION: a versão do modelo que quer usarPode usar os seguintes valores:
latestpara usar a versão mais recente do modelo, que está disponível na pasta principal sem data no contentor: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00, para usar uma versão específica do modelo, que pode ser encontrada aninhada na pasta principal com a data correspondente no contentor: gs://dataflow-templates-REGION_NAME/
LOCATION: a região onde quer implementar a tarefa do Dataflow, por exemplo,us-central1BIGTABLE_INSTANCE_ID: o ID da instância do Bigtable.BIGTABLE_TABLE_ID: o ID da tabela do Bigtable.BIGTABLE_APPLICATION_PROFILE_ID: o ID do perfil da aplicação do Bigtable.PUBSUB_TOPIC: o nome do tópico de destino do Pub/Sub
O que se segue?
- Saiba mais sobre os modelos do Dataflow.
- Consulte a lista de modelos fornecidos pela Google.