Modificare partizioni, record e query dello stream

Questa pagina descrive modifiche in tempo reale in Spanner per i database con dialetto GoogleSQL e PostgreSQL, tra cui:

  • Modello di partizionamento basato sulla suddivisione
  • Il formato e il contenuto dei record dello stream di modifiche
  • La sintassi di basso livello utilizzata per eseguire query su questi record
  • Un esempio di workflow della query

Utilizzi l'API Spanner per interrogare modifiche in tempo reale di modifiche. Le applicazioni che utilizzano Dataflow per leggere i dati delle modifiche in tempo reale non devono interagire direttamente con il modello dei dati descritto qui.

Per una guida introduttiva più ampia agli modifiche in tempo reale, consulta Panoramica degli stream di modifiche.

Modificare le partizioni del flusso di modifiche

Quando si verifica una modifica in una tabella monitorata da uno stream di modifiche, Spanner scrive un record di stream di modifiche corrispondente nel database, in modo sincrono nella stessa transazione della modifica dei dati. Ciò significa che se la transazione va a buon fine, Spanner ha anche acquisito e reso persistente la modifica. Internamente, Spanner colloca il record del flusso di modifiche e la modifica dei dati in modo che vengano elaborati dallo stesso server per ridurre al minimo il sovraccarico di scrittura.

Nell'ambito del DML a una determinata suddivisione, Spanner aggiunge la scrittura alla suddivisione dei dati dello stream delle modifiche corrispondente nella stessa transazione. Grazie a questa collocazione, i flussi di modifiche non richiedono un coordinamento aggiuntivo tra le risorse di pubblicazione, il che riduce al minimo l'overhead del commit delle transazioni.

immagine

Spanner esegue lo scale up dividendo e unendo dinamicamente i dati in base al carico e alle dimensioni del database e distribuendo le suddivisioni tra le risorse di servizio.

Per consentire la scalabilità delle scritture e delle letture dei modifiche in tempo reale, Spanner divide e unisce lo spazio di archiviazione interno dei flussi di modifiche insieme ai dati del database, evitando automaticamente gli hotspot. Per supportare la lettura dei record di modifica in tempo reale quasi in tempo reale man mano che le scritture del database vengono scalate, l&#39API Spannerer è progettata per consentire l'esecuzione di query simultanee su un flusso di modifiche utilizzando le partizioni del flusso di modifiche. Mappa le partizioni del flusso di modifiche per dividere i dati del flusso di modifiche che contengono i record del flusso di modifiche. Le partizioni di un flusso di modifiche cambiano dinamicamente nel tempo e sono correlate al modo in cui Spanner suddivide e unisce dinamicamente i dati del database.

Una partizione del flusso di modifiche contiene record per un intervallo di chiavi immutabile per un intervallo di tempo specifico. Qualsiasi partizione dello stream delle modifiche può essere suddivisa in una o più partizioni dello stream delle modifiche oppure unita ad altre partizioni dello stream delle modifiche. Quando si verificano questi eventi di suddivisione o unione, vengono create partizioni secondarie per acquisire le modifiche per i rispettivi intervalli di chiavi immutabili per l'intervallo di tempo successivo. Oltre ai record di modifica dei dati, una query di flusso di modifiche restituisce record di partizioni secondarie per notificare ai lettori le nuove partizioni del flusso di modifiche da interrogare, nonché record heartbeat per indicare l'avanzamento quando non sono state eseguite scritture di recente.

Quando esegui una query su una determinata partizione dello stream delle modifiche, i record delle modifiche vengono restituiti in ordine di timestamp di commit. Ogni record di modifica viene restituito esattamente una volta. L'ordinamento dei record di modifica non è garantito tra le partizioni del flusso di modifiche. I record delle modifiche per una determinata chiave primaria vengono restituiti solo in una partizione per un determinato intervallo di tempo.

A causa della derivazione delle partizioni padre-figlio, per elaborare le modifiche per una determinata chiave in ordine di timestamp di commit, i record restituiti dalle partizioni figlio devono essere elaborati solo dopo che sono stati elaborati i record di tutte le partizioni padre.

Modificare le funzioni di lettura delle modifiche in tempo reale e la sintassi delle query

GoogleSQL

Per eseguire query sui modifiche in tempo reale, utilizza l'API ExecuteStreamingSql. Spanner crea automaticamente una funzione di lettura speciale insieme allo stream delle modifiche. La funzione di lettura fornisce l'accesso ai record dello stream di modifiche. La convenzione di denominazione della funzione di lettura è READ_change_stream_name.

Supponendo che nel database esista uno stream di modifiche SingersNameStream, la sintassi della query per GoogleSQL è la seguente:

SELECT ChangeRecord
FROM READ_SingersNameStream (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    read_options
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp TIMESTAMP Obbligatorio Specifica che devono essere restituiti i record con commit_timestamp maggiore o uguale a start_timestamp. Il valore deve rientrare nel periodo di conservazione dello stream delle modifiche e deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp di creazione dello stream delle modifiche.
end_timestamp TIMESTAMP Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record con un valore commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nel periodo di conservazione dello stream di modifiche ed essere maggiore o uguale a start_timestamp. La query termina dopo aver restituito tutti i ChangeRecords fino a end_timestamp o quando termini la connessione. Se end_timestamp è impostato su NULL o non è specificato, l'esecuzione della query continua finché non vengono restituiti tutti i ChangeRecords o finché non termini la connessione.
partition_token STRING Facoltativo (valore predefinito: NULL) Specifica la partizione dello stream delle modifiche da interrogare in base al contenuto dei record delle partizioni secondarie. Se NULL o non specificato, significa che il lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto token di partizione specifici da cui eseguire query.
heartbeat_milliseconds INT64 Obbligatorio Determina la frequenza con cui viene restituito un battito cardiaco ChangeRecord nel caso in cui non siano state eseguite transazioni in questa partizione.

Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque minuti).
read_options ARRAY Facoltativo (valore predefinito: NULL) Aggiunge opzioni di lettura riservate per un utilizzo futuro. L'unico valore consentito è NULL.

Ti consigliamo di creare un metodo helper per creare il testo della query della funzione di lettura e associare i parametri, come mostrato nell'esempio seguente.

Java

    private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
    "SELECT ChangeRecord FROM READ_SingersNameStream"
        + "("
        + "   start_timestamp => @startTimestamp,"
        + "   end_timestamp => @endTimestamp,"
        + "   partition_token => @partitionToken,"
        + "   heartbeat_milliseconds => @heartbeatMillis"
        + ")";

    // Helper method to conveniently create change stream query texts and
    // bind parameters.
    public static Statement getChangeStreamQuery(
          String partitionToken,
          Timestamp startTimestamp,
          Timestamp endTimestamp,
          long heartbeatMillis) {
      return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                        .bind("startTimestamp")
                        .to(startTimestamp)
                        .bind("endTimestamp")
                        .to(endTimestamp)
                        .bind("partitionToken")
                        .to(partitionToken)
                        .bind("heartbeatMillis")
                        .to(heartbeatMillis)
                        .build();
    }
    

PostgreSQL

Per eseguire query sui modifiche in tempo reale, utilizza l'API ExecuteStreamingSql. Spanner crea automaticamente una funzione di lettura speciale insieme allo stream delle modifiche. La funzione di lettura fornisce l'accesso ai record dello stream di modifiche. La convenzione di denominazione della funzione di lettura è spanner.read_json_change_stream_name.

Supponendo che nel database esista uno stream di modifiche SingersNameStream, la sintassi della query per PostgreSQL è la seguente:

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
    start_timestamp,
    end_timestamp,
    partition_token,
    heartbeat_milliseconds,
    null
)

La funzione di lettura accetta i seguenti argomenti:

Nome argomento Tipo Obbligatorio? Descrizione
start_timestamp timestamp with time zone Obbligatorio Specifica che devono essere restituiti i record di modifica con commit_timestamp maggiore o uguale a start_timestamp. Il valore deve rientrare nel periodo di conservazione dello stream delle modifiche e deve essere minore o uguale all'ora corrente e maggiore o uguale al timestamp di creazione dello stream delle modifiche.
end_timestamp timestamp with timezone Facoltativo (valore predefinito: NULL) Specifica che devono essere restituiti i record di modifica con commit_timestamp minore o uguale a end_timestamp. Il valore deve rientrare nel periodo di conservazione dello stream di modifiche ed essere maggiore o uguale a start_timestamp. La query termina dopo aver restituito tutti i record di modifica fino al giorno end_timestamp o finché non interrompi la connessione. Se NULL, la query continua l'esecuzione finché non vengono restituiti tutti i record di modifica o finché non termini la connessione.
partition_token text Facoltativo (valore predefinito: NULL) Specifica la partizione dello stream delle modifiche da interrogare in base al contenuto dei record delle partizioni secondarie. Se NULL o non specificato, significa che il lettore sta eseguendo una query sul flusso di modifiche per la prima volta e non ha ottenuto token di partizione specifici da cui eseguire query.
heartbeat_milliseconds bigint Obbligatorio Determina la frequenza con cui viene restituito un heartbeat ChangeRecord quando non vengono eseguite transazioni in questa partizione. Il valore deve essere compreso tra 1,000 (un secondo) e 300,000 (cinque minuti).
null null Obbligatorio Riservato per l'uso futuro

Ti consigliamo di creare un metodo helper per creare il testo della funzione di lettura e associare i parametri, come mostrato nell'esempio seguente.

Java

private static final String SINGERS_NAME_STREAM_QUERY_TEMPLATE =
        "SELECT * FROM \"spanner\".\"read_json_SingersNameStream\""
            + "($1, $2, $3, $4, null)";

// Helper method to conveniently create change stream query texts and
// bind parameters.
public static Statement getChangeStreamQuery(
      String partitionToken,
      Timestamp startTimestamp,
      Timestamp endTimestamp,
      long heartbeatMillis) {

  return Statement.newBuilder(SINGERS_NAME_STREAM_QUERY_TEMPLATE)
                    .bind("p1")
                    .to(startTimestamp)
                    .bind("p2")
                    .to(endTimestamp)
                    .bind("p3")
                    .to(partitionToken)
                    .bind("p4")
                    .to(heartbeatMillis)
                    .build();
}

Formato dei record dei flussi di modifiche

GoogleSQL

La funzione di lettura dei modifiche in tempo reale restituisce una singola colonna ChangeRecord di tipo ARRAY<STRUCT<...>>. In ogni riga, questo array contiene sempre un singolo elemento.

Gli elementi dell'array hanno il seguente tipo:

STRUCT <
  data_change_record ARRAY<STRUCT<...>>,
  heartbeat_record ARRAY<STRUCT<...>>,
  child_partitions_record ARRAY<STRUCT<...>>
>

Questo STRUCT contiene tre campi: data_change_record, heartbeat_record e child_partitions_record, ognuno di tipo ARRAY<STRUCT<...>>. In qualsiasi riga restituita dalla funzione di lettura dello stream di modifiche, solo uno di questi tre campi contiene un valore; gli altri due sono vuoti o NULL. Questi campi array contengono al massimo un elemento.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

PostgreSQL

La funzione di lettura dei modifiche in tempo reale restituisce una singola colonna ChangeRecord di tipo JSON con la seguente struttura:

{
  "data_change_record" : {},
  "heartbeat_record" : {},
  "child_partitions_record" : {}
}

In questo oggetto sono possibili tre chiavi: data_change_record, heartbeat_record e child_partitions_record, il tipo di valore corrispondente è JSON. In qualsiasi riga restituita dalla funzione di lettura del flusso di modifiche, esiste solo una di queste tre chiavi.

Le sezioni seguenti esaminano ciascuno di questi tre tipi di record.

Record di modifica dei dati

Un record di modifica dei dati contiene un insieme di modifiche a una tabella con lo stesso tipo di modifica (inserimento, aggiornamento o eliminazione) eseguite nello stesso timestamp di commit in una partizione del flusso di modifiche per la stessa transazione. È possibile restituire più record di modifica dei dati per la stessa transazione in più partizioni dello stream di modifiche.

Tutti i record di modifica dei dati hanno i campi commit_timestamp, server_transaction_id e record_sequence, che insieme determinano l'ordine nel flusso di modifiche per un record di flusso. Questi tre campi sono sufficienti per derivare l'ordinamento delle modifiche e fornire coerenza esterna.

Tieni presente che più transazioni possono avere lo stesso timestamp di commit se toccano dati non sovrapposti. Il campo server_transaction_id offre la possibilità di distinguere quale insieme di modifiche (potenzialmente in più partizioni del flusso di modifiche) è stato emesso all'interno della stessa transazione. Se lo combini con i campi record_sequence e number_of_records_in_transaction, puoi memorizzare e ordinare tutti i record di una determinata transazione.

I campi di un record di modifica dei dati includono quanto segue:

GoogleSQL

Campo Tipo Descrizione
commit_timestamp TIMESTAMP Indica il timestamp in cui è stata eseguita la modifica.
record_sequence STRING Indica il numero di sequenza del record all'interno della transazione. I numeri di sequenza sono univoci e aumentano monotonicamente (ma non necessariamente in modo contiguo) all'interno di una transazione. Ordina i record per lo stesso server_transaction_id in base a record_sequence per ricostruire l'ordine delle modifiche all'interno della transazione. Spanner potrebbe ottimizzare questo ordinamento per migliorare le prestazioni e potrebbe non corrispondere sempre all'ordinamento originale che fornisci.
server_transaction_id STRING Fornisce una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record di flusso delle modifiche e non è correlato all'ID transazione nell'API di Spanner.
is_last_record_in_transaction_in_partition BOOL Indica se questo è l'ultimo record per una transazione nella partizione corrente.
table_name STRING Nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nella configurazione dello stream di modifiche al momento dell'acquisizione di questa modifica.

Il tipo di acquisizione del valore può essere uno dei seguenti:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Il valore predefinito è OLD_AND_NEW_VALUES. Per ulteriori informazioni, consulta la sezione Tipi di acquisizione del valore.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Indica il nome della colonna, il tipo di colonna, se è una chiave primaria e la posizione della colonna come definita nello schema (ordinal_position). La prima colonna di una tabella nello schema avrebbe una posizione ordinale di 1. Il tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento API Spanner.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Descrive le modifiche apportate, inclusi i valori della chiave primaria, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e i contenuti dei valori precedenti e nuovi dipendono dal value_capture_type configurato. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di modifica. Uno dei valori INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Indica il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche.
number_of_partitions_in_transaction INT64 Indica il numero di partizioni che restituiscono record di modifica dei dati per questa transazione.
transaction_tag STRING Indica il tag transazione associato a questa transazione.
is_system_transaction BOOL Indica se la transazione è una transazione di sistema.

PostgreSQL

Campo Tipo Descrizione
commit_timestamp STRING Indica il timestamp in corrispondenza del quale è stata eseguita la modifica.
record_sequence STRING Indica il numero di sequenza del record all'interno della transazione. I numeri di sequenza sono univoci e aumentano monotonicamente (ma non necessariamente in modo contiguo) all'interno di una transazione. Ordina i record per lo stesso server_transaction_id in base a record_sequence per ricostruire l'ordine delle modifiche all'interno della transazione.
server_transaction_id STRING Fornisce una stringa univoca a livello globale che rappresenta la transazione in cui è stata eseguita la modifica. Il valore deve essere utilizzato solo nel contesto dell'elaborazione dei record di flusso delle modifiche e non è correlato all'ID transazione nell'API di Spanner
is_last_record_in_transaction_in_partition BOOLEAN Indica se questo è l'ultimo record per una transazione nella partizione corrente.
table_name STRING Indica il nome della tabella interessata dalla modifica.
value_capture_type STRING

Descrive il tipo di acquisizione del valore specificato nella configurazione dello stream di modifiche al momento dell'acquisizione di questa modifica.

Il tipo di acquisizione del valore può essere uno dei seguenti:

  • OLD_AND_NEW_VALUES
  • NEW_ROW
  • NEW_VALUES
  • NEW_ROW_AND_OLD_VALUES

Il valore predefinito è OLD_AND_NEW_VALUES. Per saperne di più, consulta la sezione Tipi di acquisizione del valore.

column_types
[
  {
      "name": "STRING",
      "type": {
        "code": "STRING"
      },
      "is_primary_key": BOOLEAN
      "ordinal_position": NUMBER
    },
    ...
]
Indica il nome della colonna, il tipo di colonna, se si tratta di una chiave primaria e la posizione della colonna come definita nello schema (ordinal_position). La prima colonna di una tabella nello schema avrebbe una posizione ordinale di 1. Il tipo di colonna può essere nidificato per le colonne di array. Il formato corrisponde alla struttura del tipo descritta nel riferimento API Spanner.
mods
[
  {
    "keys": {"STRING" : "STRING"},
    "new_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
    "old_values": {
      "STRING" : "VALUE-TYPE",
      [...]
    },
  },
  [...]
]
Descrive le modifiche apportate, inclusi i valori della chiave primaria, i valori precedenti e i nuovi valori delle colonne modificate o monitorate. La disponibilità e i contenuti dei valori precedenti e nuovi dipendono da value_capture_type configurato. I campi new_values e old_values contengono solo le colonne non chiave.
mod_type STRING Descrive il tipo di modifica. Uno dei valori INSERT, UPDATE o DELETE.
number_of_records_in_transaction INT64 Indica il numero di record di modifica dei dati che fanno parte di questa transazione in tutte le partizioni del flusso di modifiche.
number_of_partitions_in_transaction NUMBER Indica il numero di partizioni che restituiscono record di modifica dei dati per questa transazione.
transaction_tag STRING Indica il tag transazione associato a questa transazione.
is_system_transaction BOOLEAN Indica se la transazione è una transazione di sistema.

Esempio di record di modifica dei dati

Di seguito sono riportati due esempi di record di modifica dei dati. Descrivono una singola transazione in cui viene effettuato un trasferimento tra due conti. I due account si trovano in partizioni separate dello stream delle modifiche.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z",
        "Balance": 1500
      },
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}
"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  "record_sequence": "00000001",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
      "name": "Balance",
      "type": {"code": "INT"},
      "is_primary_key": false,
      "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id2"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 2000
      },
      "old_values": {
        "LastUpdate": "2022-01-20T11:25:00.199915Z",
        "Balance": 1500
      },
    },
    ...
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "OLD_AND_NEW_VALUES",
  "number_of_records_in_transaction": 2,
  "number_of_partitions_in_transaction": 2,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false,
}

Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_VALUES. Tieni presente che vengono compilati solo i nuovi valori. È stata modificata solo la colonna LastUpdate, pertanto è stata restituita solo questa colonna.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,
  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z"
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_ROW. È stata modificata solo la colonna LastUpdate, ma vengono restituite tutte le colonne monitorate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {}
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Il seguente record di modifica dei dati è un esempio di record con il tipo di acquisizione del valore NEW_ROW_AND_OLD_VALUES. È stata modificata solo la colonna LastUpdate, ma vengono restituite tutte le colonne monitorate. Questo tipo di acquisizione valore acquisisce il nuovo valore e il vecchio valore di LastUpdate.

"data_change_record": {
  "commit_timestamp": "2022-09-27T12:30:00.123456Z",
  // record_sequence is unique and monotonically increasing within a
  // transaction, across all partitions.
  "record_sequence": "00000000",
  "server_transaction_id": "6329047911",
  "is_last_record_in_transaction_in_partition": true,

  "table_name": "AccountBalance",
  "column_types": [
    {
      "name": "AccountId",
      "type": {"code": "STRING"},
      "is_primary_key": true,
      "ordinal_position": 1
    },
    {
      "name": "LastUpdate",
      "type": {"code": "TIMESTAMP"},
      "is_primary_key": false,
      "ordinal_position": 2
    },
    {
       "name": "Balance",
       "type": {"code": "INT"},
       "is_primary_key": false,
       "ordinal_position": 3
    }
  ],
  "mods": [
    {
      "keys": {"AccountId": "Id1"},
      "new_values": {
        "LastUpdate": "2022-09-27T12:30:00.123456Z",
        "Balance": 1000
      },
      "old_values": {
        "LastUpdate": "2022-09-26T11:28:00.189413Z"
      }
    }
  ],
  "mod_type": "UPDATE", // options are INSERT, UPDATE, DELETE
  "value_capture_type": "NEW_ROW_AND_OLD_VALUES",
  "number_of_records_in_transaction": 1,
  "number_of_partitions_in_transaction": 1,
  "transaction_tag": "app=banking,env=prod,action=update",
  "is_system_transaction": false
}

Record di battito cardiaco

Quando viene restituito un record heartbeat, significa che tutte le modifiche con commit_timestamp inferiore o uguale a quello del record heartbeat timestamp sono state restituite e i record di dati futuri in questa partizione devono avere timestamp di commit superiori a quelli restituiti dal record heartbeat. I record heartbeat vengono restituiti quando non sono state scritte modifiche ai dati in una partizione. Quando vengono scritte modifiche ai dati nella partizione, è possibile utilizzare data_change_record.commit_timestamp anziché heartbeat_record.timestamp per indicare che il lettore sta avanzando nella lettura della partizione.

Puoi utilizzare i record heartbeat restituiti sulle partizioni per sincronizzare i lettori in tutte le partizioni. Una volta che tutti i lettori hanno ricevuto un heartbeat maggiore o uguale a un timestamp A o hanno ricevuto dati o record di partizioni secondarie maggiori o uguali al timestamp A, i lettori sanno di aver ricevuto tutti i record di cui è stato eseguito il commit in corrispondenza o prima di quel timestamp A e possono iniziare a elaborare i record memorizzati nel buffer, ad esempio ordinando i record tra partizioni in base al timestamp e raggruppandoli per server_transaction_id.

Un record heartbeat contiene un solo campo:

GoogleSQL

Campo Tipo Descrizione
timestamp TIMESTAMP Indica il timestamp del record heartbeat.

PostgreSQL

Campo Tipo Descrizione
timestamp STRING Indica il timestamp del record heartbeat.

Record di heartbeat di esempio

Un esempio di record heartbeat, che comunica che sono stati restituiti tutti i record con timestamp minori o uguali al timestamp di questo record:

heartbeat_record: {
  "timestamp": "2022-09-27T12:35:00.312486Z"
}

Record di partizione secondaria

I record delle partizioni secondarie restituiscono informazioni sulle partizioni secondarie: i relativi token di partizione, i token delle partizioni principali e il start_timestamp che rappresenta il timestamp meno recente per cui le partizioni secondarie contengono record di modifica. I record i cui timestamp di commit precedono immediatamente child_partitions_record.start_timestamp vengono restituiti nella partizione corrente. Dopo aver restituito tutti i record delle partizioni secondarie per questa partizione, questa query restituisce uno stato di successo, a indicare che tutti i record sono stati restituiti per questa partizione.

I campi di un record di partizione secondaria includono quanto segue:

GoogleSQL

Campo Tipo Descrizione
start_timestamp TIMESTAMP Indica che i record di modifica dei dati restituiti dalle partizioni secondarie in questo record di partizione secondaria hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando esegui una query su una partizione secondaria, la query deve specificare il token della partizione secondaria e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutti i record delle partizioni secondarie restituiti da una partizione hanno lo stesso start_timestamp e il timestamp rientra sempre tra start_timestamp e end_timestamp specificati nella query.
record_sequence STRING Indica un numero di sequenza crescente in modo monotono che può essere utilizzato per definire l'ordinamento dei record di partizione secondaria quando vengono restituiti più record di partizione secondaria con lo stesso start_timestamp in una determinata partizione. Il token di partizione, start_timestamp e record_sequence identificano in modo univoco un record di partizione secondaria.
child_partitions
[
  {
    "token" : "STRING",
    "parent_partition_tokens" : ["STRING"]
  }
]
Restituisce un insieme di partizioni secondarie e le informazioni associate. Sono inclusi la stringa del token di partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle relative partizioni principali.

PostgreSQL

Campo Tipo Descrizione
start_timestamp STRING Indica che i record di modifica dei dati restituiti dalle partizioni secondarie in questo record di partizione secondaria hanno un timestamp di commit maggiore o uguale a start_timestamp. Quando esegui una query su una partizione secondaria, la query deve specificare il token della partizione secondaria e un valore start_timestamp maggiore o uguale a child_partitions_token.start_timestamp. Tutti i record delle partizioni secondarie restituiti da una partizione hanno lo stesso start_timestamp e il timestamp rientra sempre tra start_timestamp e end_timestamp specificati nella query.
record_sequence STRING Indica un numero di sequenza crescente in modo monotono che può essere utilizzato per definire l'ordinamento dei record di partizione secondaria quando vengono restituiti più record di partizione secondaria con lo stesso start_timestamp in una determinata partizione. Il token di partizione, start_timestamp e record_sequence identificano in modo univoco un record di partizione secondaria.
child_partitions
[
  {
    "token": "STRING",
    "parent_partition_tokens": ["STRING"],
  }, [...]
]
Restituisce un array di partizioni secondarie e le relative informazioni. Sono inclusi la stringa del token di partizione utilizzata per identificare la partizione secondaria nelle query, nonché i token delle relative partizioni principali.

Esempio di record di partizione secondaria

Di seguito è riportato un esempio di record di partizione secondaria:

child_partitions_record: {
  "start_timestamp": "2022-09-27T12:40:00.562986Z",
  "record_sequence": "00000001",
  "child_partitions": [
    {
      "token": "child_token_1",
      // To make sure changes for a key is processed in timestamp
      // order, wait until the records returned from all parents
      // have been processed.
      "parent_partition_tokens": ["parent_token_1", "parent_token_2"]
    }
  ],
}

Flusso di lavoro per le query dei flussi di modifiche

Esegui query sul flusso di modifiche utilizzando l'API ExecuteStreamingSql, con una transazione di sola lettura monouso e un limite di timestamp rigido. La funzione di lettura dello stream di modifiche ti consente di specificare start_timestamp e end_timestamp per l'intervallo di tempo di interesse. Tutti i record di modifica entro il periodo di conservazione sono accessibili utilizzando il limite temporale di sola lettura rigoroso.

Tutti gli altri TransactionOptions non sono validi per le query di change stream. Inoltre, se TransactionOptions.read_only.return_read_timestamp è impostato su true, nel messaggio Transaction che descrive la transazione viene restituito un valore speciale di kint64max - 1 anziché un timestamp di lettura valido. Questo valore speciale deve essere ignorato e non utilizzato per query successive.

Ogni query di stream delle modifiche può restituire un numero qualsiasi di righe, ognuna contenente un record di modifica dei dati, un record heartbeat o un record di partizioni secondarie. Non è necessario impostare una scadenza per la richiesta.

Esempio di workflow di query di modifica in tempo reale

Il flusso di lavoro della query di streaming inizia con l'emissione della prima query dello stream di modifiche specificando partition_token in NULL. La query deve specificare la funzione di lettura per lo stream delle modifiche, il timestamp di inizio e di fine di interesse e l'intervallo di heartbeat. Quando end_timestamp è NULL, la query continua a restituire le modifiche ai dati fino al termine della partizione.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:00Z",
  end_timestamp => NULL,
  partition_token => NULL,
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:00Z',
  NULL,
  NULL,
  10000,
  NULL
) ;

Elabora i record di dati di questa query finché non vengono restituiti tutti i record delle partizioni secondarie. Nell'esempio seguente, vengono restituiti due record di partizione secondaria e tre token di partizione, quindi la query termina. I record della partizione secondaria di una query specifica condividono sempre lo stesso start_timestamp.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": "1000012389",
  "child_partitions": [
    {
      "token": "child_token_1",
      // Note parent tokens are null for child partitions returned
        // from the initial change stream queries.
      "parent_partition_tokens": [NULL]
    }
    {
      "token": "child_token_2",
      "parent_partition_tokens": [NULL]
    }
  ],
}
child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:00:01Z",
  "record_sequence": "1000012390",
  "child_partitions": [
    {
      "token": "child_token_3",
      "parent_partition_tokens": [NULL]
    }
  ],
}

Per elaborare le modifiche dopo il giorno 2022-05-01T09:00:01Z, crea tre nuove query ed eseguile in parallelo. Utilizzate insieme, le tre query restituiscono le modifiche ai dati per lo stesso intervallo di chiavi coperto dal relativo elemento principale. Imposta sempre start_timestamp su start_timestamp nello stesso record di partizione secondaria e utilizza lo stesso end_timestamp e lo stesso intervallo di heartbeat per elaborare i record in modo coerente in tutte le query.

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_1",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_2",
  heartbeat_milliseconds => 10000
);
SELECT ChangeRecord FROM READ_SingersNameStream (
  start_timestamp => "2022-05-01T09:00:01Z",
  end_timestamp => NULL,
  partition_token => "child_token_3",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_1',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_2',
  10000,
  NULL
);
SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:00:01Z',
  NULL,
  'child_token_3',
  10000,
  NULL
);

La query su child_token_2 termina dopo aver restituito un altro record di partizione secondaria. Questo record indica che una nuova partizione copre le modifiche sia per child_token_2 sia per child_token_3 a partire dal giorno 2022-05-01T09:30:15Z. La query del giorno child_token_3 restituisce lo stesso record perché entrambe sono le partizioni principali del nuovo child_token_4. Per garantire un'elaborazione rigorosa e ordinata dei record di dati per una determinata chiave, la query su child_token_4 deve iniziare dopo che tutti i genitori hanno terminato. In questo caso, i genitori sono child_token_2 e child_token_3. Crea una sola query per ogni token di partizione secondaria. La progettazione del flusso di lavoro della query deve assegnare un genitore all'attesa e pianificare la query su child_token_4.

child_partitions_record: {
  "record_type": "child_partitions",
  "start_timestamp": "2022-05-01T09:30:15Z",
  "record_sequence": "1000012389",
  "child_partitions": [
    {
      "token": "child_token_4",
      "parent_partition_tokens": ["child_token_2", "child_token_3"],
    }
  ],
}

GoogleSQL

SELECT ChangeRecord FROM READ_SingersNameStream(
  start_timestamp => "2022-05-01T09:30:15Z",
  end_timestamp => NULL,
  partition_token => "child_token_4",
  heartbeat_milliseconds => 10000
);

PostgreSQL

SELECT *
FROM "spanner"."read_json_SingersNameStream" (
  '2022-05-01T09:30:15Z',
  NULL,
  'child_token_4',
  10000,
  NULL
);

Trova esempi di gestione e analisi dei record di stream di modifiche nel connettore Dataflow SpannerIO di Apache Beam su GitHub.