Eventi e flussi di dati

La gerarchia dei dati in Datastream è:

  • Uno stream, costituito da un'origine dati e una destinazione.
  • Un oggetto, ovvero una parte di uno stream, ad esempio una tabella di un database specifico.
  • Un evento, ovvero una singola modifica generata da un oggetto specifico, ad esempio un inserimento nel database.

Flussi, oggetti ed eventi hanno dati e metadati associati. Questi dati e metadati possono essere utilizzati per scopi diversi.

Informazioni sugli eventi

Ogni evento è composto da tre tipi di dati:

  • Dati sugli eventi: rappresenta la modifica ai dati stessi dell'oggetto proveniente dall'origine del flusso. Ogni evento contiene l'intera riga modificata.
  • Metadati generici: questi metadati vengono visualizzati in ogni evento generato da Datastream e vengono utilizzati per azioni come la rimozione dei dati duplicati nella destinazione.
  • Metadati specifici dell'origine: questi metadati vengono visualizzati in ogni evento generato da una specifica origine stream. Questi metadati variano in base all'origine.

Dati sull'evento

I dati sugli eventi sono il payload di ogni modifica di un determinato oggetto proveniente da un'origine stream.

Gli eventi sono in formato Avro o JSON.

Quando utilizzi il formato Avro, per ogni colonna l'evento contiene l'indice e il valore della colonna. Utilizzando l'indice della colonna, il nome della colonna e il tipo unificato possono essere recuperati dallo schema nell'intestazione Avro.

Quando lavori con il formato JSON, per ogni colonna l'evento contiene il nome e il valore della colonna.

I metadati evento possono essere utilizzati per raccogliere informazioni sull'origine dell'evento, nonché per rimuovere i dati duplicati nella destinazione e ordinare gli eventi in base al consumer downstream.

Le tabelle seguenti elencano e descrivono i campi e i tipi di dati per i metadati degli eventi generici e specifici per l'origine.

Metadati generici

Questi metadati sono coerenti in tutti i tipi di stream.

Campo Tipo Avro Tipo JSON Descrizione
stream_name string string Il nome univoco dello stream definito al momento della creazione.
read_method string string

Indica se i dati sono stati letti dall'origine utilizzando un metodo Change Data Capture (CDC), nell'ambito del backfill storico o nell'ambito di un'attività di integrazione creata quando una transazione viene eseguita il rollback durante la replica CDC.

I valori possibili sono:

  • oracle-cdc-logminer
  • oracle-backfill
  • oracle-supplementation
  • mysql-cdc-binlog
  • mysql-backfill-incremental
  • mysql-backfill-fulldump
  • postgres-cdc-wal
  • postgresql-backfill
  • salesforce-cdc
  • salesforce-backfill
object string string Il nome utilizzato per raggruppare diversi tipi di eventi, in genere il nome della tabella o dell'oggetto nella sorgente.
schema_key string string L'identificatore univoco dello schema unificato dell'evento.
uuid string string Un identificatore univoco dell'evento generato da Datastream.
read_timestamp timestamp-millis string Il timestamp (UTC) in cui il record è stato letto da Datastream (il timestamp dell'epoca in millisecondi).
source_timestamp timestamp-millis string Il timestamp (UTC) in cui il record è stato modificato nell'origine (il timestamp epoch in millisecondi).
sort_keys {"type": "array", "items": ["string", "long"]} matrice Un array di valori che possono essere utilizzati per ordinare gli eventi in base all'ordine in cui si sono verificati.

Metadati specifici dell'origine

Questi metadati sono associati agli eventi CDC e di backfill di un database di origine. Per visualizzare questi metadati, seleziona un'origine dal menu a discesa che segue.

Origine Campo Tipo Avro Tipo JSON Descrizione
MySQL log_file string string Il file di log da cui Datastream estrae gli eventi nella replica CDC.
MySQL log_position Lungo Lungo La posizione (offset) del log nel log binario MySQL.
MySQL primary_keys array di stringhe array di stringhe L'elenco di uno o più nomi di colonne che compongono la chiave primaria delle tabelle. Se la tabella non ha una chiave primaria, questo campo è vuoto.
MySQL is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
MySQL database string string Il database associato all'evento.
MySQL table string string La tabella associata all'evento.
MySQL change_type string string

Il tipo di modifica (INSERT, UPDATE-INSERT, UPDATE-DELETE e DELETE) che rappresenta l'evento.

Oracle log_file string string Il file di log da cui Datastream estrae gli eventi nella replica CDC.
Oracle scn Lungo Lungo La posizione (offset) del log nel log delle transazioni Oracle.
Oracle row_id string string row_id di Oracle.
Oracle is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
Oracle database string string Il database associato all'evento.
Oracle schema string string Lo schema associato alla tabella dell'evento.
Oracle table string string La tabella associata all'evento.
Oracle change_type string string

Il tipo di modifica (INSERT, UPDATE-INSERT, UPDATE-DELETE e DELETE) che rappresenta l'evento.

Oracle tx_id string string L'ID transazione a cui appartiene l'evento.
Oracle rs_id string string L'ID del set di record. L'accoppiamento di rs_id e ssn identifica in modo univoco una riga in V$LOGMNR_CONTENTS. rs_id identifica in modo univoco il record di ripetizione che ha generato la riga.
Oracle ssn Lungo Lungo Un numero di sequenza SQL. Questo numero viene utilizzato con rs_id e identifica in modo univoco una riga in V$LOGMNR_CONTENTS.
PostgreSQL schema string string Lo schema associato alla tabella dell'evento.
PostgreSQL table string string La tabella associata all'evento.
PostgreSQL is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
PostgreSQL change_type string string Il tipo di modifica (INSERT, UPDATE, DELETE) che rappresenta l'evento.
PostgreSQL tx_id string string L'ID transazione a cui appartiene l'evento.
PostgreSQL lsn string string Il numero di sequenza del log per la voce corrente.
PostgreSQL primary_keys array di stringhe array di stringhe L'elenco di uno o più nomi di colonne che compongono la chiave primaria delle tabelle. Se la tabella non ha una chiave primaria, questo campo è vuoto.
SQL Server table string string La tabella associata all'evento.
SQL Server database Lungo Lungo Il database associato all'evento.
SQL Server schema array di stringhe array di stringhe Lo schema associato alla tabella dell'evento.
SQL Server is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
SQL Server lsn string string Il numero di sequenza del log per l'evento.
SQL Server tx_id string string L'ID transazione a cui appartiene l'evento.
SQL Server physical_location array di numeri interi array di numeri interi La posizione fisica del record di log descritta da tre numeri interi: ID file, ID pagina e ID slot del record.
SQL Server replication_index array di stringhe array di stringhe L'elenco dei nomi delle colonne di un indice che può identificare in modo univoco una riga nella tabella.
SQL Server change_type string string

Il tipo di modifica (INSERT, UPDATE, DELETE) che rappresenta l'evento.

Spanner commit_timestamp string string Il timestamp del commit dell'evento.
Spanner snapshot boolean boolean Se l'evento è un evento snapshot di backfill.
Spanner project_id string string L'identificatore del progetto Spanner.
Spanner instance_id string string L'identificatore dell'istanza Spanner.
Spanner database_id string string L'identificatore del database Spanner.
Spanner change_stream_name string string La modifica in tempo reale di Spanner.
Spanner table string string La tabella Spanner.
Spanner server_transaction_id string string L'identificatore della transazione associato all'evento.
Spanner record_sequence string string La sequenza di record associata all'evento.
Spanner mod_index string string Il numero di modifica associato all'evento.
Spanner transaction_tag string string Il tag per la transazione associata.
Spanner system_transaction string string Indica se la transazione è una transazione di sistema.
Spanner number_of_records_in_transaction Lungo Lungo Il numero di record nella transazione associata.
Spanner value_capture_type enum enum Il tipo di acquisizione del valore della modifica in tempo reale.
Spanner mod_type enum enum Indica se il record era un INSERT, un UPDATE o un DELETE.
Spanner primary_keys matrice matrice L'elenco delle colonne di chiave primaria per la tabella.
Spanner is_deleted boolean boolean Se la modifica rappresenta o meno un evento DELETE.
Salesforce object_name string string

Il nome dell'oggetto Salesforce associato all'evento.

Salesforce domain string string

Il nome del dominio associato all'evento.

Salesforce is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
Salesforce change_type string string

Il tipo di modifica (INSERT, UPDATE, DELETE) che rappresenta l'evento.

Salesforce primary_keys array di stringhe array di stringhe L'elenco dei nomi delle colonne che compongono la chiave primaria della tabella. Se la tabella non ha una chiave primaria, questo campo è vuoto.
MongoDB database string string Il database associato all'evento.
MongoDB collection string string La raccolta associata all'evento. Le raccolte sono analoghe alle tabelle nei database relazionali.
MongoDB change_type string string Il tipo di modifica (CREATE, UPDATE e DELETE) rappresentata dall'evento.
MongoDB is_deleted boolean boolean
  • Un valore true indica che la riga è stata eliminata nell'origine.
  • Un valore false indica che la riga non è stata eliminata.
MongoDB primary_keys array di stringhe array di stringhe Il campo _id, che funge da chiave primaria per ogni documento di una raccolta.

Esempio di un flusso di eventi

Questo flusso illustra gli eventi generati da tre operazioni consecutive: INSERT, UPDATE e DELETE, su una singola riga di una tabella SAMPLE per un database di origine.

TEMPO THIS_IS_MY_PK (int) FIELD1 (nchar nullable) FIELD2 (nchar non-null)>
0 1231535353 foo TLV
1 1231535353 NULL TLV

INSERT (T0)

Il payload del messaggio è costituito dall'intera nuova riga.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "d7989206-380f-0e81-8056-240501101100",
  "read_timestamp": "2019-11-07T07:37:16.808Z",
  "source_timestamp": "2019-11-07T02:15:39",  
  "source_metadata": {
    "log_file": ""
    "scn": 15869116216871,
    "row_id": "AAAPwRAALAAMzMBABD",
    "is_deleted": false,
    "database": "DB1",
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "INSERT",
    "tx_id": 
    "rs_id": "0x0073c9.000a4e4c.01d0",
    "ssn": 67,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": "foo",
    "FIELD2": "TLV",
  }
}

AGGIORNAMENTO (T1)

Il payload del messaggio è costituito dall'intera nuova riga. Non include i valori precedenti.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "e6067366-1efc-0a10-a084-0d8701101101",
  "read_timestamp": "2019-11-07T07:37:18.808Z",
  "source_timestamp": "2019-11-07T02:17:39",  
  "source_metadata": {
    "log_file": 
    "scn": 15869150473224,
    "row_id": "AAAGYPAATAAPIC5AAB",
    "is_deleted": false,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "UPDATE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0010",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

DELETE (T2)

Il payload del messaggio è costituito dall'intera nuova riga.

{
  "stream_name": "projects/myProj/locations/myLoc/streams/Oracle-to-Source",
  "read_method": "oracle-cdc-logminer",
  "object": "SAMPLE.TBL",
  "uuid": "c504f4bc-0ffc-4a1a-84df-6aba382fa651",
  "read_timestamp": "2019-11-07T07:37:20.808Z",
  "source_timestamp": "2019-11-07T02:19:39",
  "source_metadata": {
    "log_file": 
    "scn": 158691504732555,
    "row_id": "AAAGYPAATAAPIC5AAC",
    "is_deleted": true,
    "database":
    "schema": "ROOT",
    "table": "SAMPLE"
    "change_type": "DELETE",
    "tx_id":
    "rs_id": "0x006cf4.00056b26.0011",
    "ssn": 0,
  },
  "payload": {
    "THIS_IS_MY_PK": "1231535353",
    "FIELD1": null,
    "FIELD2": "TLV",
  }
}

Ordine e coerenza

Questa sezione spiega come Datastream gestisce l'ordine e la coerenza.

Ordini

Datastream non garantisce l'ordinamento, ma ogni evento contiene l'intera riga di dati e il timestamp del momento in cui i dati sono stati scritti nell'origine. In BigQuery, gli eventi non in ordine vengono uniti automaticamente nella sequenza corretta. BigQuery utilizza i metadati degli eventi e un numero di sequenza di modifica (CSN) interno per applicare gli eventi alla tabella nell'ordine corretto. In Cloud Storage, gli eventi dello stesso periodo di tempo possono estendersi su più file.

Gli eventi generati in modo non sequenziale si verificano intenzionalmente quando viene eseguito il backfill per il backfill iniziale dei dati creati all'avvio dello stream.

L'ordinamento può essere dedotto in base all'origine.

Origine Descrizione
MySQL

Gli eventi che fanno parte del backfill iniziale hanno il campo read_method che inizia con mysql-backfill. L'ordine in cui vengono ricevuti gli eventi nel backfill non ha alcuna implicazione, in quanto possono essere utilizzati in qualsiasi ordine.

Gli eventi che fanno parte della replica in corso hanno il campo read_method impostato su mysql-cdc-binlog.

L'ordine può essere dedotto dalla combinazione del campo log_file e del campo log_position, che è offset rispetto al file di log. Questa combinazione fornisce un numero univoco e in aumento incrementale che identifica l'ordine di operazione nel database.

Oracle

Gli eventi che fanno parte del backfill iniziale hanno il campo read_method che inizia con oracle-backfill. L'ordine in cui vengono ricevuti gli eventi nel backfill non ha alcuna implicazione, in quanto possono essere utilizzati in qualsiasi ordine.

Gli eventi che fanno parte della replica in corso hanno il campo read_method impostato su oracle-cdc-logminer.

L'ordine può essere dedotto dalla combinazione del campo rs_id (l'ID set di record) e del campo ssn (un numero di sequenza SQL). Questa combinazione fornisce un numero univoco e in aumento incrementale che identifica l'ordine di operazione nel database.

PostgreSQL

Gli eventi che fanno parte del backfill iniziale hanno il campo read_method che inizia con postgresql-backfill. L'ordine in cui vengono ricevuti gli eventi nel backfill non ha alcuna implicazione, in quanto possono essere utilizzati in qualsiasi ordine.

Gli eventi che fanno parte della replica in corso hanno il campo read_method impostato su postgres-cdc-wal.

L'ordine può essere dedotto dalla combinazione del campo source_timestamp e del campo lsn (numero di sequenza del log). Questa combinazione fornisce un numero univoco e in aumento incrementale che identifica l'ordine di operazione nel database.

SQL Server

Gli eventi che fanno parte del backfill iniziale hanno il campo read_method che inizia con sqlserver-backfill. L'ordine in cui vengono ricevuti gli eventi nel backfill non ha alcuna implicazione, in quanto possono essere utilizzati in qualsiasi ordine.

Gli eventi che fanno parte della replica in corso hanno il campo read_method impostato su sqlserver-cdc.

L'ordine può essere dedotto dalla combinazione del campo source_timestamp e del campo lsn (numero di sequenza del log). Questa combinazione fornisce un numero univoco e in aumento incrementale che identifica l'ordine di operazione nel database.

Quando esegui la replica in BigQuery, viene utilizzato anche l'orario di creazione del database per garantire l'ordinamento corretto. Ciò è importante negli scenari in cui l'istanza del database di origine potrebbe cambiare, ad esempio durante le migrazioni o il failover a una replica.

Salesforce

L'ordine può essere determinato utilizzando source_timestamp del record come chiave di ordinamento. Il timestamp in Salesforce ha una risoluzione di un secondo, ma non possono verificarsi due eventi di modifica per lo stesso record nello stesso secondo.

MongoDB (anteprima)

L'ordine può essere determinato utilizzando il campo ts nel log delle operazioni o il campo clusterTime nello stream di modifiche per il record. I campi sono univoci per ogni record.

Spanner

L'ordine può essere determinato utilizzando la combinazione dei campi commit_timestamp, record_sequence e mod_index. Questa combinazione fornisce un valore univoco e in aumento incrementale che identifica l'ordine delle operazioni nel database.

Coerenza

Datastream garantisce che i dati del database di origine vengano recapitati alla destinazione almeno una volta. Nessun evento viene perso, ma esiste la possibilità di eventi duplicati nel flusso. La finestra per gli eventi duplicati deve essere dell'ordine di minuti e l'identificatore univoco universale (UUID) dell'evento nei metadati dell'evento può essere utilizzato per rilevare i duplicati.

Quando i file di log del database contengono transazioni non sottoposte a commit, se vengono eseguite operazioni di rollback, il database lo riflette nei file di log come operazioni di data manipulation language (DML) "inversa". Ad esempio, un'operazione INSERT di rollback avrà un'operazione DELETE corrispondente. Datastream legge queste operazioni dai file di log.

Informazioni sugli stream

Ogni stream ha metadati che descrivono sia lo stream sia l'origine da cui vengono estratti i dati. Questi metadati includono informazioni come il nome dello stream e i profili di connessione di origine e destinazione.

Per visualizzare la definizione completa dell'oggetto Stream, consulta la documentazione Riferimento API.

Stato e stato del flusso

Uno stream può avere uno dei seguenti stati:

  • Not started
  • Starting
  • Running
  • Draining
  • Paused
  • Failed
  • Failed permanently

Puoi utilizzare i log per trovare informazioni aggiuntive sullo stato, ad esempio il backfilling delle tabelle o il numero di righe elaborate. Puoi anche utilizzare l'API FetchStreamErrors per recuperare gli errori.

Metadati degli oggetti disponibili tramite l'API Discover

L'API Discover restituisce oggetti che rappresentano la struttura degli oggetti definiti nell'origine dati o nella destinazione rappresentata dal profilo di connessione. Ogni oggetto ha metadati sull'oggetto stesso, nonché per ogni campo di dati che estrae. Questi metadati sono disponibili utilizzando l'API Discover.

Passaggi successivi