Partições, registros e consultas de fluxos de alterações

Esta página descreve os fluxo de alterações no Spanner para bancos de dados com dialetos GoogleSQL e PostgreSQL, incluindo:

  • Modelo de particionamento baseado em divisão
  • O formato e o conteúdo dos registros de fluxo de alterações
  • A sintaxe de baixo nível usada para consultar esses registros
  • Exemplo do fluxo de trabalho de consulta

Você usa a API Spanner para consultar fluxo de alterações diretamente. Os aplicativos que usam o Dataflow para ler dados do fluxo de alterações não precisam trabalhar diretamente com o modelo de dados descrito aqui.

Para um guia introdutório mais amplo sobre fluxo de alterações, consulte Visão geral dos streams de alterações.

Partições de fluxo de alterações

Quando uma mudança ocorre em uma tabela monitorada por um fluxo de alterações, o Spanner grava um registro de fluxo de alterações correspondente no banco de dados, de forma síncrona, na mesma transação da mudança de dados. Isso significa que, se a transação for bem-sucedida, o Spanner também terá capturado e persistido a mudança. Internamente, o Spanner aloca o registro do fluxo de alterações e a alteração de dados para que sejam processados pelo mesmo servidor e minimize a sobrecarga de gravação.

Como parte da DML em uma divisão específica, o Spanner anexa a gravação à divisão de dados do stream de mudanças correspondente na mesma transação. Devido a essa colocalização, os fluxos de mudanças não adicionam coordenação extra entre os recursos de serviço, o que minimiza a sobrecarga de confirmação da transação.

imagem

O Spanner é escalonado dividindo e mesclando dados dinamicamente com base na carga e no tamanho do banco de dados, além de distribuir as divisões entre os recursos de serviço.

Para permitir que as gravações e leituras de fluxo de alterações sejam escalonadas, o Spanner divide e mescla o armazenamento interno de fluxos de alterações com os dados do banco de dados, evitando automaticamente os pontos de acesso. Para oferecer suporte à leitura de registros de fluxos de alterações quase em tempo real à medida que as gravações de banco de dados são escalonadas, a API Spanner foi projetada para que um fluxo de alterações seja consultado simultaneamente usando partições de fluxo de alterações. As partições de fluxo de alterações são mapeadas para divisões de dados de fluxo de alterações que contêm os registros de fluxo de alterações. As partições de um fluxo de alterações mudam dinamicamente ao longo do tempo e estão correlacionadas à forma como o Spanner divide e mescla dinamicamente os dados do banco de dados.

Uma partição de fluxo de alterações contém registros de um intervalo de chaves imutável para um intervalo de tempo específico. Qualquer partição do fluxo de alterações pode ser dividida em uma ou mais partições do fluxo de alterações ou mesclada com outras partições do fluxo de alterações. Quando esses eventos de divisão ou fusão acontecem, partições secundárias são criadas para capturar as mudanças nos respectivos intervalos de chaves imutáveis para o próximo intervalo de tempo. Além dos registros de mudança de dados, uma consulta de fluxo de alterações retorna registros de partição filha para notificar os leitores sobre novas partições de fluxo de alterações que precisam ser consultadas, bem como registros de pulsação para indicar o progresso quando nenhuma gravação ocorreu recentemente.

Ao consultar uma partição específica do stream de mudanças, os registros são retornados na ordem do carimbo de data/hora de confirmação. Cada registro de mudança é retornado exatamente uma vez. A ordenação de registros de mudança não é garantida em todas as partições de fluxo de mudanças. Os registros de mudança de uma determinada chave primária são retornados apenas em uma partição para um período específico.

Devido à linhagem de partição pai-filho, para processar mudanças em uma chave específica na ordem de carimbo de data/hora do commit, os registros retornados das partições filhas só devem ser processados depois que os registros de todas as partições principais forem processados.

Mudar funções de leitura de fluxo de alterações e sintaxe de consulta

GoogleSQL

Para consultar fluxo de alterações, use a API ExecuteStreamingSql. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é READ_change_stream_name.

Supondo que um fluxo de mudanças SingersNameStream exista no banco de dados, a sintaxe de consulta para o GoogleSQL é a seguinte:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

A função "read" aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp TIMESTAMP Obrigatório Especifica que os registros com commit_timestamp maior ou igual a start_timestamp devem ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações, ser menor ou igual à hora atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações.
end_timestamp TIMESTAMP Opcional (padrão: NULL) Especifica que os registros com um commit_timestamp menor ou igual a end_timestamp devem ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser maior ou igual a start_timestamp. A consulta termina depois de retornar todos os ChangeRecords até o end_timestamp ou quando você encerra a conexão. Se end_timestamp estiver definido como NULL ou não for especificado, a consulta continuará a execução até que todos os ChangeRecords sejam retornados ou até que você encerre a conexão.
partition_token STRING Opcional (padrão: NULL) Especifica qual partição de fluxo de alterações consultar, com base no conteúdo dos registros de partições filhas. Se NULL ou não especificado, isso significa que o leitor está consultando o fluxo de mudanças pela primeira vez e não obteve nenhum token de partição específico para consultar.
heartbeat_milliseconds INT64 Obrigatório Determina com que frequência um heartbeat ChangeRecord é retornado caso não haja transações confirmadas nessa partição.

O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco minutos).
read_options ARRAY Opcional (padrão: NULL) Adiciona opções de leitura reservadas para uso futuro. O único valor permitido é NULL.

Recomendamos criar um método auxiliar para criar o texto da consulta da função de leitura e vincular parâmetros a ela, conforme mostrado no exemplo a seguir.

Java

    private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
    "SELECT ChangeRecord FROM READ_SingersNameStream"
        + "("
        + "   start_timestamp => @startTimestamp,"
        + "   end_timestamp => @endTimestamp,"
        + "   partition_token => @partitionToken,"
        + "   heartbeat_milliseconds => @heartbeatMillis"
        + ")";

    // Helper method to conveniently create change stream query texts and
    // bind parameters.
    public static Statement getChangeStreamQuery(
          String partitionToken,
          Timestamp startTimestamp,
          Timestamp endTimestamp,
          long heartbeatMillis) {
      return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                        .bind("startTimestamp")
                        .to(startTimestamp)
                        .bind("endTimestamp")
                        .to(endTimestamp)
                        .bind("partitionToken")
                        .to(partitionToken)
                        .bind("heartbeatMillis")
                        .to(heartbeatMillis)
                        .build();
    }
    

PostgreSQL

Para consultar fluxo de alterações, use a API ExecuteStreamingSql. O Spanner cria automaticamente uma função de leitura especial junto com o fluxo de alterações. A função de leitura fornece acesso aos registros do fluxo de alterações. A convenção de nomenclatura da função de leitura é spanner.read_json_change_stream_name.

Supondo que um fluxo de mudanças SingersNameStream exista no banco de dados, a sintaxe de consulta para PostgreSQL é a seguinte:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

A função "read" aceita os seguintes argumentos:

Nome do argumento Tipo Obrigatório? Descrição
start_timestamp timestamp with time zone Obrigatório Especifica que os registros de mudança com commit_timestamp maior ou igual a start_timestamp devem ser retornados. O valor precisa estar dentro do período de armazenamento do fluxo de alterações, ser menor ou igual à hora atual e maior ou igual ao carimbo de data/hora da criação do fluxo de alterações.
end_timestamp timestamp with timezone Opcional (padrão: NULL) Especifica que os registros de mudança com commit_timestamp menor ou igual a end_timestamp precisam ser retornados. O valor precisa estar dentro do período de retenção do fluxo de alterações e ser maior ou igual a start_timestamp. A consulta termina depois de retornar todos os registros de mudança até o end_timestamp ou até que você encerre a conexão. Se NULL, a consulta continuará a execução até que todos os registros de mudança sejam retornados ou até que você encerre a conexão.
partition_token text Opcional (padrão: NULL) Especifica qual partição de fluxo de alterações consultar, com base no conteúdo dos registros de partições filhas. Se NULL ou não especificado, isso significa que o leitor está consultando o fluxo de mudanças pela primeira vez e não obteve nenhum token de partição específico para consultar.
heartbeat_milliseconds bigint Obrigatório Determina com que frequência um heartbeat ChangeRecord é retornado quando não há transações confirmadas nessa partição. O valor precisa estar entre 1,000 (um segundo) e 300,000 (cinco minutos).
null null Obrigatório Reservado para uso futuro

Recomendamos criar um método auxiliar para criar o texto da função de leitura e vincular parâmetros a ele, conforme mostrado no exemplo a seguir.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and
// bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Formato do registro do fluxo de alterações

GoogleSQL

A função de leitura de fluxo de alterações retorna uma única coluna ChangeRecord do tipo ARRAY<STRUCT<...>>. Em cada linha, essa matriz sempre contém um único elemento.

Os elementos da matriz têm o seguinte tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Há três campos nesse STRUCT: data_change_record, heartbeat_record e child_partitions_record, cada um do tipo ARRAY<STRUCT<...>>. Em qualquer linha retornada pela função de leitura do stream de mudanças, apenas um desses três campos contém um valor. Os outros dois estão vazios ou são NULL. Esses campos de matriz contêm, no máximo, um elemento.

As seções a seguir analisam cada um desses três tipos de registros.

PostgreSQL

A função de leitura de fluxo de alterações retorna uma única coluna ChangeRecord do tipo JSON com a seguinte estrutura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

Há três chaves possíveis nesse objeto: data_change_record, heartbeat_record e child_partitions_record. O tipo de valor correspondente é JSON. Em qualquer linha retornada pela função de leitura do fluxo de alterações, apenas uma dessas três chaves existe.

As seções a seguir analisam cada um desses três tipos de registros.

Registros de alteração de dados

Um registro de alteração de dados contém um conjunto de mudanças em uma tabela com o mesmo tipo de modificação (inserção, atualização ou exclusão) confirmadas no mesmo carimbo de data/hora de confirmação em uma partição de fluxo de alterações para a mesma transação. Vários registros de mudança de dados podem ser retornados para a mesma transação em várias partições de fluxo de alterações.

Todos os registros de alteração de dados têm campos commit_timestamp, server_transaction_id e record_sequence, que juntos determinam a ordem no fluxo de alterações de um registro de fluxo. Esses três campos são suficientes para derivar a ordenação das mudanças e fornecer consistência externa.

Várias transações podem ter o mesmo carimbo de data/hora de confirmação se acessarem dados não sobrepostos. O campo server_transaction_id permite distinguir qual conjunto de mudanças (potencialmente em partições de fluxo de mudanças) foi emitido na mesma transação. Ao combinar com os campos record_sequence e number_of_records_in_transaction, você também pode armazenar em buffer e ordenar todos os registros de uma transação específica.

Os campos de um registro de mudança de dados incluem o seguinte:

GoogleSQL

Campo Tipo Descrição
commit_timestamp TIMESTAMP Indica o carimbo de data/hora em que a mudança foi confirmada.
record_sequence STRING Indica o número de sequência do registro na transação. Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não necessariamente contíguos) em uma transação. Ordene os registros do mesmo server_transaction_id por record_sequence para reconstruir a ordem das mudanças na transação. O Spanner pode otimizar essa ordenação para melhorar a performance, e nem sempre ela vai corresponder à ordenação original fornecida.
server_transaction_id STRING Fornece uma string globalmente exclusiva que representa a transação em que a mudança foi confirmada. O valor só deve ser usado no contexto de processamento de registros de fluxo de alterações e não está correlacionado ao ID da transação na API do Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se este é o último registro de uma transação na partição atual.
table_name STRING Nome da tabela afetada pela mudança.
value_capture_type STRING

Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando essa mudança foi capturada.

O tipo de captura de valor pode ser um dos seguintes:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Por padrão, ele é OLD_AND_NEW_VALUES. Para mais informações, consulte tipos de captura de valor.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Indica o nome da coluna, o tipo dela, se é uma chave primária e a posição da coluna conforme definido no esquema (ordinal_position). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1. O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API Spanner.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Descreve as mudanças feitas, incluindo os valores da chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependem do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas sem chave.
mod_type STRING Descreve o tipo de mudança. Um de INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 Indica o número de registros de alteração de dados que fazem parte desta transação em todas as partições de fluxo de alterações.
number_of_partitions_in_transaction INT64 Indica o número de partições que retornam registros de alteração de dados para essa transação.
transaction_tag STRING Indica a tag de transação associada a esta transação.
is_system_transaction BOOL Indica se a transação é do sistema.

PostgreSQL

Campo Tipo Descrição
commit_timestamp STRING Indica o carimbo de data/hora em que a mudança foi confirmada.
record_sequence STRING Indica o número de sequência do registro na transação. Os números de sequência são exclusivos e aumentam de maneira uniforme (mas não necessariamente contíguos) em uma transação. Ordene os registros do mesmo server_transaction_id por record_sequence para reconstruir a ordem das mudanças na transação.
server_transaction_id STRING Fornece uma string globalmente exclusiva que representa a transação em que a mudança foi confirmada. O valor só deve ser usado no contexto de processamento de registros de fluxo de alterações e não está correlacionado ao ID da transação na API do Spanner.
is_last_record_in_transaction_in_partition BOOLEAN Indica se este é o último registro de uma transação na partição atual.
table_name STRING Indica o nome da tabela afetada pela mudança.
value_capture_type STRING

Descreve o tipo de captura de valor especificado na configuração do fluxo de mudanças quando essa mudança foi capturada.

O tipo de captura de valor pode ser um dos seguintes:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Por padrão, ele é OLD_AND_NEW_VALUES. Para mais informações, consulte tipos de captura de valor.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Indica o nome da coluna, o tipo dela, se é uma chave primária e a posição da coluna conforme definido no esquema (ordinal_position). A primeira coluna de uma tabela no esquema teria uma posição ordinal de 1. O tipo de coluna pode ser aninhado para colunas de matriz. O formato corresponde à estrutura de tipo descrita na referência da API do Spanner.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Descreve as mudanças feitas, incluindo os valores da chave primária, os valores antigos e os novos valores das colunas alteradas ou rastreadas. A disponibilidade e o conteúdo dos valores antigos e novos dependem do value_capture_type configurado. Os campos new_values e old_values contêm apenas as colunas não principais.
mod_type STRING Descreve o tipo de mudança. Um de INSERT, UPDATE ou DELETE.
number_of_records_in_transaction INT64 Indica o número de registros de alteração de dados que fazem parte desta transação em todas as partições de fluxo de alterações.
number_of_partitions_in_transaction NUMBER Indica o número de partições que retornam registros de alteração de dados para essa transação.
transaction_tag STRING Indica a tag de transação associada a esta transação.
is_system_transaction BOOLEAN Indica se a transação é do sistema.

Exemplo de registro de alteração de dados

Confira a seguir um par de exemplos de registros de alteração de dados. Elas descrevem uma única transação em que há uma transferência entre duas contas. As duas contas estão em partições separadas de fluxos de mudanças.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

O registro de mudança de dados a seguir é um exemplo de registro com o tipo de captura de valor NEW_VALUES. Somente os novos valores são preenchidos. Apenas a coluna LastUpdate foi modificada, então somente ela foi retornada.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de mudança de dados a seguir é um exemplo de registro com o tipo de captura de valor NEW_ROW. Apenas a coluna LastUpdate foi modificada, mas todas as colunas rastreadas são retornadas.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

O registro de mudança de dados a seguir é um exemplo de registro com o tipo de captura de valor NEW_ROW_AND_OLD_VALUES. Apenas a coluna LastUpdate foi modificada, mas todas as colunas rastreadas são retornadas. Esse tipo de captura de valor captura o valor novo e o antigo de LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Registros de batimentos cardíacos

Quando um registro de pulsação é retornado, isso indica que todas as mudanças com commit_timestamp menor ou igual ao timestamp do registro de pulsação foram retornadas, e os registros de dados futuros nessa partição precisam ter carimbos de data/hora de confirmação maiores que os retornados pelo registro de pulsação. Os registros de pulsação são retornados quando não há alterações de dados gravadas em uma partição. Quando há mudanças de dados gravadas na partição, data_change_record.commit_timestamp pode ser usado em vez de heartbeat_record.timestamp para informar que o leitor está avançando na leitura da partição.

Você pode usar registros de pulsação retornados em partições para sincronizar leitores em todas as partições. Depois que todos os leitores receberem um heartbeat maior ou igual a algum carimbo de data/hora A ou receberem dados ou registros de partição secundária maiores ou iguais ao carimbo de data/hora A, os leitores saberão que receberam todos os registros confirmados no carimbo de data/hora A ou antes dele e poderão começar a processar os registros armazenados em buffer. Por exemplo, classificar os registros entre partições por carimbo de data/hora e agrupá-los por server_transaction_id.

Um registro de pulsação contém apenas um campo:

GoogleSQL

Campo Tipo Descrição
timestamp TIMESTAMP Indica o carimbo de data/hora do registro de pulsação.

PostgreSQL

Campo Tipo Descrição
timestamp STRING Indica o carimbo de data/hora do registro de pulsação.

Exemplo de registro de pulsação

Um exemplo de registro de pulsação, comunicando que todos os registros com carimbos de data/hora menores ou iguais ao carimbo de data/hora deste registro foram retornados:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Registros de partição secundária

Os registros de partições filhas retornam informações sobre elas: tokens de partição, tokens das partições principais e o start_timestamp que representa o carimbo de data/hora mais antigo em que as partições filhas contêm registros de mudança. Os registros com carimbos de data/hora de confirmação imediatamente anteriores a child_partitions_record.start_timestamp são retornados na partição atual. Depois de retornar todos os registros de partição filha para essa partição, essa consulta retorna com um status de sucesso, indicando que todos os registros foram retornados para essa partição.

Os campos de um registro de partição secundária incluem o seguinte:

GoogleSQL

Campo Tipo Descrição
start_timestamp TIMESTAMP Indica que os registros de mudança de dados retornados de partições filhas neste registro de partição filha têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, a consulta precisa especificar o token da partição filha e um start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição têm o mesmo start_timestamp, e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp especificados da consulta.
record_sequence STRING Indica um número de sequência monotonicamente crescente que pode ser usado para definir a ordenação dos registros de partição filha quando há vários registros de partição filha retornados com o mesmo start_timestamp em uma partição específica. O token de partição, start_timestamp e record_sequence identificam exclusivamente um registro de partição secundária.
child_partitions
[
  {
    "token" : "STRING",
    "parent_partition_tokens" : ["STRING"]
  }
]
Retorna um conjunto de partições secundárias e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições pai.

PostgreSQL

Campo Tipo Descrição
start_timestamp STRING Indica que os registros de mudança de dados retornados de partições filhas neste registro de partição filha têm um carimbo de data/hora de confirmação maior ou igual a start_timestamp. Ao consultar uma partição filha, a consulta precisa especificar o token da partição filha e um start_timestamp maior ou igual a child_partitions_token.start_timestamp. Todos os registros de partições filhas retornados por uma partição têm o mesmo start_timestamp, e o carimbo de data/hora sempre fica entre o start_timestamp e o end_timestamp especificados da consulta.
record_sequence STRING Indica um número de sequência monotonicamente crescente que pode ser usado para definir a ordenação dos registros de partição filha quando há vários registros de partição filha retornados com o mesmo start_timestamp em uma partição específica. O token de partição, start_timestamp e record_sequence identificam exclusivamente um registro de partição secundária.
child_partitions
[
  {
    "token": "STRING",
    "parent_partition_tokens": ["STRING"],
  }, [...]
]
Retorna uma matriz de partições secundárias e as informações associadas a elas. Isso inclui a string de token de partição usada para identificar a partição filha em consultas, bem como os tokens das partições principais.

Exemplo de registro de partição secundária

Confira a seguir um exemplo de registro de partição secundária:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Fluxo de trabalho de consulta de fluxos de alterações

Execute consultas de fluxo de alterações usando a API ExecuteStreamingSql, com uma transação somente leitura de uso único e um vínculo de carimbo de data/hora forte. A função de leitura de fluxo de mudanças permite especificar o start_timestamp e end_timestamp para o período de tempo de interesse. Todos os registros de mudança dentro do período de armazenamento são acessíveis usando o limite de carimbo de data/hora somente leitura forte.

Todos os outros TransactionOptions são inválidos para consultas de fluxo de alterações. Além disso, se TransactionOptions.read_only.return_read_timestamp for definido como true, um valor especial de kint64max - 1 será retornado na mensagem Transaction que descreve a transação, em vez de um carimbo de data/hora de leitura válido. Esse valor especial precisa ser descartado e não pode ser usado em consultas subsequentes.

Cada consulta de fluxo de alterações pode retornar qualquer número de linhas, cada uma contendo um registro de alteração de dados, um registro de pulsação ou um registro de partições secundárias. Não é necessário definir um prazo para o pedido.

Exemplo de fluxo de trabalho de consulta de fluxo de alterações

O fluxo de trabalho de consulta de streaming começa com a emissão da primeira consulta de fluxo de mudanças especificando o partition_token como NULL. A consulta precisa especificar a função de leitura do fluxo de alterações, o carimbo de data/hora de início e fim de interesse e o intervalo de pulsação. Quando end_timestamp é NULL, a consulta continua retornando mudanças de dados até o fim da partição.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Processa registros de dados dessa consulta até que todos os registros de partição filha sejam retornados. No exemplo a seguir, dois registros de partição filhos e três tokens de partição são retornados, e a consulta é encerrada. Os registros de partição secundária de uma consulta específica sempre compartilham o mesmo start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": "1000012389",
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": "1000012390",
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Para processar as mudanças após 2022-05-01T09:00:01Z, crie três novas consultas e execute-as em paralelo. Usadas juntas, as três consultas retornam mudanças de dados para o mesmo intervalo de chaves que o pai delas abrange. Sempre defina o start_timestamp como o start_timestamp no mesmo registro de partição secundária e use o mesmo end_timestamp e intervalo de pulsação para processar os registros de forma consistente em todas as consultas.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

A consulta em child_token_2 termina depois de retornar outro registro de partição filha. Esse registro indica que uma nova partição está abrangendo mudanças para child_token_2 e child_token_3 a partir de 2022-05-01T09:30:15Z. O mesmo registro é retornado pela consulta em child_token_3, porque ambos são as partições principais do novo child_token_4. Para garantir um processamento estrito e ordenado de registros de dados para uma determinada chave, a consulta em child_token_4 precisa começar depois que todos os pais terminarem. Neste caso, os pais são child_token_2 e child_token_3. Crie apenas uma consulta para cada token de partição filho. O design do fluxo de trabalho de consulta deve designar um pai para aguardar e programar a consulta em child_token_4.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": "1000012389",
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": ["child_token_2", "child_token_3"],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Encontre exemplos de processamento e análise de registros de fluxo de alterações no conector do Dataflow SpannerIO do Apache Beam no GitHub.