Utilizza l'ETL inversa per caricare i dati da BigQuery in Spanner Graph

Questo documento descrive come utilizzare le pipeline di estrazione, trasformazione e caricamento (ETL) inversi per spostare e sincronizzare continuamente i dati del grafico da BigQuery a Spanner Graph. Tratta i seguenti aspetti chiave:

Per utilizzare l'ETL inversa per esportare i dati da BigQuery a Spanner, consulta Esportare i dati in Spanner.

BigQuery esegue la manipolazione complessa dei dati su larga scala come piattaforma di elaborazione analitica, mentre Spanner è ottimizzato per i casi d'uso che richiedono QPS elevati e bassa latenza di servizio. Spanner Graph e BigQuery si integrano in modo efficace per preparare i dati del grafico nelle pipeline di analisi BigQuery, consentendo a Spanner di gestire le traversie del grafico a bassa latenza.

Prima di iniziare

  1. Crea un'istanza Spanner con un database contenente dati del grafico. Per saperne di più, vedi Configura ed esegui query su Spanner Graph.

  2. In BigQuery, crea una prenotazione di slot di livello Enterprise o Enterprise Plus. Puoi ridurre i costi di calcolo di BigQuery quando esegui le esportazioni in Spanner Graph. Per farlo, imposta una capacità di base degli slot pari a zero e attiva la scalabilità automatica.

  3. Concedi ruoli IAM (Identity and Access Management) che forniscono agli utenti le autorizzazioni necessarie per eseguire ogni attività descritta in questo documento.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per esportare i dati dei grafici BigQuery in Spanner Graph, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Casi d'uso di Reverse ETL

Di seguito sono riportati alcuni esempi di casi d'uso. Dopo aver analizzato ed elaborato i dati in BigQuery, puoi spostarli in Spanner Graph utilizzando l'ETL inversa.

Aggregazione e riepilogo dei dati: utilizza BigQuery per calcolare gli aggregati sui dati granulari per renderli più adatti ai casi d'uso operativi.

Trasformazione e arricchimento dei dati: utilizza BigQuery per pulire e standardizzare i dati ricevuti da diverse origini dati.

Filtro e selezione dei dati: utilizza BigQuery per filtrare un set di dati di grandi dimensioni a fini di analisi. Ad esempio, potresti filtrare i dati che non sono necessari per le applicazioni in tempo reale.

Pre-elaborazione e feature engineering: in BigQuery, utilizza la funzione ML.TRANSFORM per trasformare i dati o la funzione ML.FEATURE_CROSS per creare incroci di caratteristiche delle caratteristiche di input. Quindi, utilizza l'ETL inversa per spostare i dati risultanti in Spanner Graph.

Informazioni sulla pipeline ETL inversa

I dati vengono spostati da BigQuery a Spanner Graph in una pipeline ETL inversa in due passaggi:

  1. BigQuery utilizza gli slot assegnati al job della pipeline per estrarre e trasformare i dati di origine.

  2. La pipeline ETL inversa di BigQuery utilizza le API Spanner per caricare i dati in un'istanza Spanner di cui è stato eseguito il provisioning.

Il seguente diagramma mostra i passaggi di una pipeline ETL inversa:

Un diagramma che mostra i tre passaggi principali quando i dati vengono spostati da BigQuery a Spanner Graph in una pipeline ETL inversa.

Figura 1. Processo della pipeline ETL inversa di BigQuery

Gestire le modifiche ai dati del grafico

Puoi utilizzare l'ETL inversa per:

  • Caricare un set di dati del grafico da BigQuery in Spanner Graph.

  • Sincronizza i dati di Spanner Graph con gli aggiornamenti in corso di un set di dati in BigQuery.

Configuri una pipeline ETL inversa con una query SQL per specificare i dati di origine e la trasformazione da applicare. La pipeline carica tutti i dati che soddisfano la clausola WHERE dell'istruzione SELECT in Spanner utilizzando un'operazione upsert. Un'operazione upsert equivale alle istruzioni INSERT OR UPDATE. Inserisce nuove righe e aggiorna quelle esistenti nelle tabelle che archiviano i dati del grafico. La pipeline basa le righe nuove e aggiornate su una chiave primaria della tabella Spanner.

Inserire e aggiornare i dati per le tabelle con dipendenze dall'ordine di caricamento

Le best practice di progettazione dello schema di Spanner Graph consigliano di utilizzare tabelle interleaved e chiavi esterne. Se utilizzi tabelle interleaved o chiavi esterne imposte, devi caricare i dati di nodi e archi in un ordine specifico. Ciò garantisce che le righe a cui viene fatto riferimento esistano prima di creare la riga di riferimento. Per ulteriori informazioni, vedi Creare tabelle interleaved.

Il seguente schema della tabella di input del grafico di esempio utilizza una tabella interleaved e un vincolo di chiave esterna per modellare la relazione tra una persona e i suoi account:

CREATE TABLE Person (
  id    INT64 NOT NULL,
  name  STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE Account (
  id           INT64 NOT NULL,
  create_time  TIMESTAMP,
  is_blocked   BOOL,
  type        STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE PersonOwnAccount (
  id           INT64 NOT NULL,
  account_id   INT64 NOT NULL,
  create_time  TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id)
) PRIMARY KEY (id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES (
    Person,
    Account
  )
  EDGE TABLES (
    PersonOwnAccount
      SOURCE KEY (id) REFERENCES Person
      DESTINATION KEY (account_id) REFERENCES Account
      LABEL Owns
  );

In questo schema di esempio, PersonOwnAccount è una tabella interleaved in Person. Carica gli elementi nella tabella Person prima di quelli nella tabella PersonOwnAccount. Inoltre, il vincolo di chiave esterna su PersonOwnAccount garantisce l'esistenza di una riga corrispondente in Account, la destinazione della relazione edge. Pertanto, carica la tabella Account prima della tabella PersonOwnAccount. Il seguente elenco riepiloga le dipendenze dell'ordine di caricamento di questo schema:

Segui questi passaggi per caricare i dati:

  1. Carica Person prima del giorno PersonOwnAccount.
  2. Carica Account prima del giorno PersonOwnAccount.

Spanner applica i vincoli di integrità referenziale nello schema di esempio. Se la pipeline tenta di creare una riga nella tabella PersonOwnAccount senza una riga corrispondente nella tabella Person o nella tabella Account, Spanner restituisce un errore. La pipeline non va a buon fine.

Questa pipeline ETL inversa di esempio utilizza istruzioni EXPORTDATA in BigQuery per esportare i dati dalle tabelle Person, Account e PersonOwnAccount in un set di dati per soddisfare l'ordine di caricamento delle dipendenze:

BEGIN
EXPORT DATA OPTIONS (
    uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "Person",
      "priority": "HIGH",
      "tag" : "graph_data_load_person"
    }"""
  ) AS
  SELECT
    id,
    name
  FROM
    DATASET_NAME.Person;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "Account",
    "priority": "HIGH",
    "tag" : "graph_data_load_account"
  }"""
) AS
SELECT
  id,
  create_time,
  is_blocked,
  type
FROM
  DATASET_NAME.Account;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "PersonOwnAccount",
    "priority": "HIGH",
    "tag" : "graph_data_load_person_own_account"
  }"""
) AS
SELECT
  id,
  account_id,
  create_time
FROM
  DATASET_NAME.PersonOwnAccount;
END;

Sincronizzare i dati

Per sincronizzare BigQuery con Spanner Graph, utilizza pipeline ETL inverse. Puoi configurare una pipeline per eseguire una delle seguenti operazioni:

  • Applica eventuali inserimenti e aggiornamenti dall'origine BigQuery alla tabella di destinazione Spanner Graph. Puoi aggiungere elementi dello schema alle tabelle di destinazione per comunicare logicamente le eliminazioni e rimuovere le righe delle tabelle di destinazione in base a una pianificazione.

  • Utilizza una funzione di serie temporale che applica operazioni di inserimento e aggiornamento e identifica le operazioni di eliminazione.

Vincoli di integrità referenziale

A differenza di Spanner, BigQuery non applica vincoli di chiave primaria e chiave esterna. Se i dati BigQuery non rispettano i vincoli creati nelle tabelle Spanner, la pipeline ETL inversa potrebbe non riuscire a caricare i dati.

L'ETL inversa raggruppa automaticamente i dati in batch che non superano il limite massimo di mutazioni per commit e applica atomicamente i batch a una tabella Spanner in un ordine arbitrario. Se un batch contiene dati che non superano un controllo di integrità referenziale, Spanner non carica il batch. Esempi di questi errori includono una riga secondaria intercalata priva di una riga principale o una colonna con chiave esterna forzata senza un valore corrispondente nella colonna a cui fa riferimento. Se un batch non supera un controllo, la pipeline genera un errore e smette di caricare i batch.

Comprendere gli errori di vincolo di integrità referenziale

I seguenti esempi mostrano gli errori di vincolo di integrità referenziale che potresti riscontrare:

Risolvi gli errori di vincolo di chiave esterna
  • Errore: "Il vincolo di chiave esterna FK_Account viene violato nella tabella PersonOwnAccount. Impossibile trovare i valori a cui viene fatto riferimento in Account(id)"

  • Causa: l'inserimento di una riga nella tabella PersonOwnAccount non è riuscito perché manca una riga corrispondente nella tabella Account, richiesta dalla chiave esterna FK_Account.

Risolvere gli errori relativi alla riga principale mancante
  • Errore: "Manca la riga principale per la riga [15,1] nella tabella PersonOwnAccount"

  • Causa: l'inserimento di una riga in PersonOwnAccount (id: 15 e account_id: 1) non è riuscito perché manca una riga principale nella tabella Person (id: 15).

Per ridurre il rischio di errori di integrità referenziale, valuta le seguenti opzioni. Ogni opzione presenta compromessi.

  • Riduci i vincoli per consentire a Spanner Graph di caricare i dati.
  • Aggiungi logica alla pipeline per omettere le righe che violano i vincoli di integrità referenziale.

Rilassa l'integrità referenziale

Un'opzione per evitare errori di integrità referenziale durante il caricamento dei dati è di allentare i vincoli in modo che Spanner non applichi l'integrità referenziale.

  • Puoi creare tabelle interleaved con la clausola INTERLEAVE IN per utilizzare le stesse caratteristiche di interleaving delle righe fisiche. Se utilizzi INTERLEAVE IN anziché INTERLEAVE IN PARENT, Spanner non applica l'integrità referenziale, anche se le query traggono vantaggio dalla collocazione congiunta delle tabelle correlate.

  • Puoi creare chiavi esterne informative utilizzando l'opzione NOT ENFORCED. L'opzione NOT ENFORCED offre vantaggi di ottimizzazione delle query. Tuttavia, Spanner non applica l'integrità referenziale.

Ad esempio, per creare la tabella di input degli archi senza controlli di integrità referenziale, puoi utilizzare questo DDL:

CREATE TABLE PersonOwnAccount (
  id          INT64 NOT NULL,
  account_id  INT64 NOT NULL,
  create_time TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id) NOT ENFORCED
) PRIMARY KEY (id, account_id),
INTERLEAVE IN Person;

Rispetta l'integrità referenziale nelle pipeline ETL inverse

Per assicurarti che la pipeline carichi solo le righe che soddisfano i controlli di integrità referenziale, includi solo le righe PersonOwnAccount che hanno righe corrispondenti nelle tabelle Person e Account. Quindi, mantieni l'ordine di caricamento, in modo che Spanner carichi le righe Person e Account prima delle righe PersonOwnAccount che fanno riferimento a queste.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_load_person_own_account"
    }"""
  ) AS
  SELECT
    poa.id,
    poa.account_id,
    poa.create_time
  FROM `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
    JOIN `PROJECT_ID.DATASET_NAME.Person` p ON (poa.id = p.id)
    JOIN `PROJECT_ID.DATASET_NAME.Account` a ON (poa.account_id = a.id)
  WHERE poa.id = p.id
    AND poa.account_id = a.id;

Elimina elementi del grafico

Le pipeline ETL inverse utilizzano operazioni di upsert. Poiché le operazioni di upsert sono equivalenti alle istruzioni INSERT OR UPDATE, una pipeline può sincronizzare solo le righe esistenti nei dati di origine in fase di runtime. Ciò significa che la pipeline esclude le righe eliminate. Se elimini dati da BigQuery, una pipeline ETL inversa non può rimuovere direttamente gli stessi dati da Spanner Graph.

Puoi utilizzare una delle seguenti opzioni per gestire le eliminazioni dalle tabelle di origine BigQuery:

Esegui un'eliminazione logica o temporanea nell'origine

Per contrassegnare logicamente le righe per l'eliminazione, utilizza un flag eliminato in BigQuery. Poi crea una colonna nella tabella Spanner di destinazione in cui puoi propagare il flag. Quando l'ETL inversa applica gli aggiornamenti della pipeline, elimina le righe che contengono questo flag in Spanner. Puoi trovare ed eliminare queste righe in modo esplicito utilizzando il DML partizionato. In alternativa, elimina implicitamente le righe configurando una colonna TTL (time to live) con una data che dipende dalla colonna del flag di eliminazione. Scrivi query Spanner per escludere queste righe eliminate logicamente. Ciò garantisce che Spanner escluda queste righe dai risultati prima dell'eliminazione pianificata. Una volta completata l'esecuzione della pipeline ETL inversa, Spanner riflette le eliminazioni logiche nelle sue righe. Puoi quindi eliminare le righe da BigQuery.

Questo esempio aggiunge una colonna is_deleted alla tabella PersonOwnAccount in Spanner. Aggiunge poi una colonna expired_ts_generated che dipende dal valore is_deleted. La policy TTL pianifica l'eliminazione delle righe interessate perché la data nella colonna generata è precedente alla soglia DELETION POLICY.

ALTER TABLE PersonOwnAccount
  ADD COLUMN is_deleted BOOL DEFAULT (FALSE);

ALTER TABLE PersonOwnAccount ADD COLUMN
  expired_ts_generated TIMESTAMP AS (IF(is_deleted,
    TIMESTAMP("1970-01-01 00:00:00+00"),
    TIMESTAMP("9999-01-01 00:00:00+00"))) STORED HIDDEN;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts_generated, INTERVAL 0 DAY));

Utilizzare la cronologia delle modifiche di BigQuery per INSERT, UPDATE ed eliminazioni logiche

Puoi monitorare le modifiche apportate a una tabella BigQuery utilizzando la cronologia delle modifiche. Utilizza la funzione GoogleSQL CHANGES per trovare le righe modificate in un intervallo di tempo specifico. Quindi, utilizza le informazioni sulle righe eliminate con una pipeline ETL inversa. Puoi configurare la pipeline per impostare un indicatore, ad esempio un flag di eliminazione o una data di scadenza, nella tabella Spanner. Questo indicatore contrassegna le righe da eliminare nelle tabelle Spanner.

Utilizza i risultati della funzione di serie temporale CHANGES per decidere quali righe della tabella di origine includere nel caricamento della pipeline ETL inversa.

La pipeline include righe con _CHANGE_TYPE come INSERT o UPDATE come upsert se la riga esiste nella tabella di origine. La riga corrente della tabella di origine fornisce i dati più recenti.

Utilizza le righe con _CHANGE_TYPE come DELETE che non hanno righe esistenti nella tabella di origine per impostare un indicatore nella tabella Spanner, ad esempio un flag di eliminazione o una data di scadenza della riga.

La query di esportazione deve tenere conto dell'ordine di inserimenti ed eliminazioni in BigQuery. Ad esempio, considera una riga eliminata al momento T1 e una nuova riga inserita in un secondo momento T2. Se entrambi mappano la stessa riga della tabella Spanner, l'esportazione deve conservare gli effetti di questi eventi nel loro ordine originale.

Se impostato, l'indicatore delete contrassegna le righe da eliminare nelle tabelle Spanner.

Ad esempio, potresti aggiungere una colonna a una tabella di input Spanner per memorizzare la data di scadenza di ogni riga. Poi, crea una norma di eliminazione che utilizzi queste date di scadenza.

L'esempio seguente mostra come aggiungere una colonna per archiviare le date di scadenza delle righe della tabella.

ALTER TABLE PersonOwnAccount ADD COLUMN expired_ts TIMESTAMP;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts, INTERVAL 1 DAY));

Per utilizzare la funzione CHANGES su una tabella in BigQuery, imposta l'opzione enable_change_history della tabella su TRUE:

ALTER TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`
  SET OPTIONS (enable_change_history=TRUE);

L'esempio seguente mostra come utilizzare l'ETL inversa per aggiornare le righe nuove o modificate e impostare la data di scadenza per le righe contrassegnate per l'eliminazione. Un left join con la tabella PersonOwnAccount fornisce alla query informazioni sullo stato attuale di ogni riga.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_delete_via_reverse_etl"
    }"""
  ) AS
SELECT
  DISTINCT
   IF (changes._CHANGE_TYPE = 'DELETE', changes.id, poa.id) AS id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.account_id, poa.account_id) AS account_id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.create_time, poa.create_time) AS create_time,
   IF (changes._CHANGE_TYPE = 'DELETE', changes._CHANGE_TIMESTAMP, NULL) AS expired_ts
FROM
  CHANGES(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
    TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), DAY),
    TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)) changes
LEFT JOIN `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
  ON (poa.id = changes.id
  AND poa.account_id = changes.account_id)
WHERE (changes._CHANGE_TYPE = 'DELETE'
   AND poa.id IS NULL)
   OR (changes._CHANGE_TYPE IN ( 'UPDATE', 'INSERT')
   AND poa.id IS NOT NULL );

La query di esempio utilizza un LEFT JOIN con la tabella di origine per mantenere l'ordine. Questo join garantisce che i record di modifica di DELETE vengano ignorati per le righe eliminate e poi ricreate nell'intervallo della cronologia delle modifiche della query. La pipeline conserva la nuova riga valida.

Quando elimini le righe, la pipeline compila la colonna expired_ts nella riga del grafico Spanner corrispondente utilizzando il timestamp DELETE della colonna _CHANGE_TIMESTAMP. Un criterio di eliminazione delle righe (criterio TTL) in Spanner elimina qualsiasi riga in cui il valore di expired_ts è più di un giorno nel passato.

Per garantire l'affidabilità del sistema, coordina la pianificazione della pipeline, la finestra di analisi delle modifiche e la policy TTL di Spanner. Pianifica l'esecuzione giornaliera della pipeline. La norma TTL di Spanner deve avere una durata superiore a questo intervallo di esecuzione. In questo modo, la pipeline non rielabora un evento DELETE precedente per una riga già rimossa dalla policy TTL di Spanner.

Questo esempio mostra l'intervallo start_timestamp e end_timestamp per le query giornaliere che acquisiscono tutte le modifiche alla tabella BigQuery del giorno UTC precedente. Poiché si tratta di una query batch e la funzione CHANGES ha delle limitazioni, end_timestamp deve essere almeno 10 minuti prima dell'ora attuale. Pertanto, pianifica l'esecuzione di questa query almeno 10 minuti dopo mezzanotte (UTC). Per ulteriori dettagli, consulta la documentazione di CHANGES.

Utilizzare le colonne TTL con il timestamp dell'ultima visualizzazione

Una pipeline ETL inversa imposta una colonna last_seen_ts sul timestamp corrente per ogni riga della tabella Spanner. Quando elimini righe BigQuery, Spanner non aggiorna le righe corrispondenti e la colonna last_seen_ts non cambia. Spanner rimuove quindi le righe con un last_seen_ts obsoleto utilizzando un criterio TTL o DML partizionato, in base a una soglia definita. Prima dell'eliminazione pianificata, le query Spanner possono filtrare le righe con un last_seen_ts precedente a questa soglia. Questo approccio funziona in modo efficace quando i dati del grafico vengono aggiornati regolarmente e gli aggiornamenti mancanti indicano dati obsoleti da eliminare.

Esegui un aggiornamento completo

Prima di caricare i dati da BigQuery, puoi eliminare le tabelle Spanner per riflettere le eliminazioni nelle tabelle di origine. In questo modo, la pipeline non carica in Spanner le righe eliminate dalle tabelle BigQuery di origine durante l'esecuzione successiva della pipeline. Questa potrebbe essere l'opzione più semplice da implementare. Tuttavia, considera il tempo necessario per ricaricare completamente i dati del grafico.

Mantieni una pipeline ETL inversa batch pianificata

Dopo l'esecuzione iniziale della pipeline ETL inversa, i dati vengono caricati in blocco da BigQuery in Spanner Graph e i dati reali continuano a cambiare. I set di dati cambiano e la pipeline aggiunge o rimuove elementi del grafico nel tempo. La pipeline rileva nuovi nodi e aggiunge nuove relazioni di edge oppure l'inferenza dell'AI le genera.

Per garantire che il database Spanner Graph rimanga aggiornato, pianifica e sequenzia l'orchestrazione delle pipeline BigQuery utilizzando una delle seguenti opzioni:

BigQuery Pipelines ti consente di sviluppare, testare, controllare le versioni ed eseguire il deployment di workflow di trasformazione dei dati SQL complessi in BigQuery. Gestisce in modo nativo le dipendenze degli ordini consentendoti di definire le relazioni tra le query nella pipeline. Dataform crea un albero delle dipendenze ed esegue le query nell'ordine corretto. In questo modo, le dipendenze upstream vengono completate prima dell'inizio delle attività downstream.

I flussi di lavoro richiamati da Cloud Scheduler forniscono una soluzione utile e flessibile per orchestrare sequenze di serviziGoogle Cloud , incluse le query BigQuery. Definisci un flusso di lavoro come una serie di passaggi che eseguono ciascuno un job BigQuery. Puoi utilizzare Cloud Scheduler per richiamare questi workflow in base a una pianificazione definita. Gestisci le dipendenze utilizzando la definizione del flusso di lavoro per specificare l'ordine di esecuzione, implementare la logica condizionale, gestire gli errori e passare gli output da una query all'altra.

Le query pianificate, note anche come job di trasferimento BigQuery, in BigQuery consentono di eseguire istruzioni SQL su base ricorrente. Le query pianificate non offrono una gestione degli errori efficace o una gestione dinamica delle dipendenze.

ETL inverso con le query continue di BigQuery

La funzionalità Query continue di BigQuery ti consente di eseguire operazioni BigQuery quasi in tempo reale. La combinazione di EXPORT DATA con le query continue fornisce un metodo alternativo per l'esecuzione di pipeline ETL inverse che evita i job batch pianificati.

Una query continua è una query a esecuzione prolungata che monitora una tabella BigQuery di origine per rilevare nuove righe. Quando BigQuery rileva nuove righe aggiunte alla tabella, trasmette in streaming i risultati della query all'operazione EXPORT DATA.

Questo approccio offre i seguenti vantaggi.

  • Sincronizzazione dei dati quasi in tempo reale: le nuove righe in BigQuery vengono riflesse in Spanner con un ritardo minimo.

  • Overhead di elaborazione batch ridotto: una query continua elimina la necessità di job batch periodici, il che riduce l'overhead di calcolo.

  • Aggiornamenti basati sugli eventi: aggiornamenti dei dati Spanner in risposta alle modifiche effettive in BigQuery.

Una pipeline di query continua richiede un'assegnazione di prenotazione di slot con il job_type di CONTINUOUS. Assegna questo ruolo a livello di progetto o cartella o a livello di organizzazione.

Crea una query continua con l'ETL inversa da BigQuery a Spanner

Configura il parametro start_timestamp della funzione APPENDS per iniziare a elaborare i dati dal punto in cui è stato interrotto il caricamento batch. Questa funzione acquisisce tutte le righe create nella finestra temporale specifica. Nell'esempio seguente, la pipeline imposta arbitrariamente il punto di partenza 10 minuti prima di CURRENT_TIME. Questo timestamp deve rientrare nella finestra di Time Travel di BigQuery.

Esistono diversi metodi per avviare una pipeline di query continua, tra cui:

  1. In BigQuery Studio, selezionando Altro e scegliendo Query continua in Scegli modalità query.

  2. Utilizza l'interfaccia a riga di comando bq e fornisci l'opzione --continuous=true.

EXPORT DATA OPTIONS ( uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format="CLOUD_SPANNER",
  spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag": "reverse-etl-continuous",
      "change_timestamp_column": "create_time"
   }"""
)
AS SELECT id, account_id, _CHANGE_TIMESTAMP as create_time
  FROM
APPENDS(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
  CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE )

Ordine di caricamento non garantito

I dati di Spanner Graph sono costituiti da più tabelle di input. Devi rispettare un ordine di caricamento rigoroso quando le tabelle hanno vincoli di integrità referenziale. Tuttavia, le query continue simultanee non possono controllare l'ordine in cui Spanner aggiunge le righe. Di conseguenza, il caricamento dei dati di Spanner Graph utilizzando query continue è possibile solo per gli schemi grafici con vincoli di integrità referenziale meno rigidi.

Eseguire l'integrazione con le pipeline esistenti

La query continua integra i job batch pianificati esistenti. Ad esempio, utilizza query continua per aggiornamenti quasi in tempo reale e job pianificati per la sincronizzazione o la riconciliazione completa dei dati.

Utilizza la query continua BigQuery per creare pipeline ETL inverse reattive e aggiornate per sincronizzare i dati tra BigQuery e Spanner Graph.

Considerazioni sulle query continue

  • Costo: le query continue comportano costi per l'esecuzione continua delle query e lo streaming dei dati.

  • Gestione degli errori: una pipeline di query continua viene annullata se rileva errori del database, ad esempio una chiave primaria duplicata o una violazione dell'integrità referenziale. Se una pipeline non va a buon fine, devi correggere manualmente i dati nella tabella BigQuery di origine prima di riavviare la query.

  • Eliminazioni e aggiornamenti non gestiti: la funzione APPENDS acquisisce solo gli inserimenti. Non acquisisce eliminazioni o aggiornamenti.

Segui le best practice per l'ETL inversa

Per ottenere risultati ottimali, procedi nel seguente modo.

  • Scegli una strategia per evitare errori di integrità referenziale quando carichi i dati degli archi.

  • Progetta la pipeline di dati complessiva per evitare bordi sospesi. Gli archi sospesi possono compromettere l'efficienza delle query Spanner Graph e l'integrità della struttura del grafico. Per ulteriori informazioni, consulta la sezione Evitare bordi sospesi.

  • Segui i consigli per l'ottimizzazione dell'esportazione di Spanner.

  • Se carichi una grande quantità di dati, valuta la possibilità di dividere la pipeline in più pipeline più piccole per evitare di raggiungere la quota predefinita di sei ore per il tempo di esecuzione delle query BigQuery. Per saperne di più, consulta la sezione Limiti dei job di query BigQuery.

  • Per i caricamenti di grandi quantità di dati, aggiungi indici e vincoli di chiave esterna dopo aver completato il caricamento iniziale dei dati in blocco. Questa pratica migliora le prestazioni di caricamento dei dati perché i vincoli di chiave esterna richiedono letture aggiuntive per la convalida e gli indici richiedono scritture aggiuntive. Queste operazioni aumentano il numero di partecipanti alla transazione, il che può rallentare il processo di caricamento dei dati.

  • Abilita la scalabilità automatica in Spanner per velocizzare i tempi di caricamento dei dati in un'istanza. Poi, configura il parametro priority di Spanner nella sezione spanner_options del comando BigQuery EXPORT DATA su HIGH. Per ulteriori informazioni, consulta Panoramica della scalabilità automatica di Spanner, Configura le esportazioni con l'opzione spanner_options e RequestOptions.priority.

  • Per i caricamenti di grandi quantità di dati, crea punti di divisione per dividere in anticipo il database. In questo modo Spanner si prepara a un aumento del throughput.

  • Configura la priorità delle richieste di Spanner per il caricamento dei dati nella definizione della pipeline.

Passaggi successivi