Utiliser l'ETL inversé pour charger des données depuis BigQuery dans Spanner Graph

Ce document explique comment utiliser des pipelines d'extraction, de transformation et de chargement inversé (ETL) pour transférer et synchroniser en continu des données de graphiques de BigQuery vers Spanner Graph. Il aborde les aspects clés suivants :

Pour utiliser l'ETL inversé afin d'exporter des données de BigQuery vers Spanner, consultez Exporter des données vers Spanner.

BigQuery effectue des manipulations de données complexes à grande échelle en tant que plate-forme de traitement analytique, tandis que Spanner est optimisé pour les cas d'utilisation qui nécessitent un nombre élevé de requêtes par seconde et une faible latence de diffusion. Spanner Graph et BigQuery s'intègrent efficacement pour préparer les données graphiques dans les pipelines d'analyse BigQuery, ce qui permet à Spanner de diffuser des traversées de graphiques à faible latence.

Avant de commencer

  1. Créez une instance Spanner avec une base de données contenant des données graphiques. Pour en savoir plus, consultez Configurer et interroger Spanner Graph.

  2. Dans BigQuery, créez une réservation de slots de niveau Enterprise ou Enterprise Plus. Vous pouvez réduire les coûts de calcul BigQuery lorsque vous exportez des données vers Spanner Graph. Pour ce faire, définissez une capacité d'emplacements de référence égale à zéro et activez l'autoscaling.

  3. Attribuez aux utilisateurs des rôles IAM (Identity and Access Management) incluant les autorisations nécessaires pour effectuer l'ensemble des tâches du présent document.

Rôles requis

Pour obtenir les autorisations nécessaires à l'exportation des données graphiques BigQuery vers Spanner Graph, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :

Pour en savoir plus sur l'attribution de rôles, consultez Gérer l'accès aux projets, aux dossiers et aux organisations.

Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.

Cas d'utilisation de l'ETL inversé

Voici quelques exemples de cas d'utilisation. Après avoir analysé et traité les données dans BigQuery, vous pouvez les transférer vers Spanner Graph à l'aide de l'ETL inversé.

Agrégation et synthèse des données : utilisez BigQuery pour calculer des agrégats sur des données précises afin de les rendre plus adaptées aux cas d'utilisation opérationnels.

Transformation et enrichissement des données : utilisez BigQuery pour nettoyer et standardiser les données reçues de différentes sources de données.

Filtrer et sélectionner des données : utilisez BigQuery pour filtrer un grand ensemble de données à des fins d'analyse. Par exemple, vous pouvez filtrer les données qui ne sont pas nécessaires pour les applications en temps réel.

Prétraitement et ingénierie des caractéristiques : dans BigQuery, utilisez la fonction ML.TRANSFORM pour transformer les données ou la fonction ML.FEATURE_CROSS pour créer des croisements de caractéristiques à partir des caractéristiques d'entrée. Ensuite, utilisez l'ETL inversé pour déplacer les données résultantes dans Spanner Graph.

Comprendre le pipeline ETL inversé

Les données sont transférées de BigQuery vers Spanner Graph dans un pipeline ETL inversé en deux étapes :

  1. BigQuery utilise des emplacements attribués au job de pipeline pour extraire et transformer les données sources.

  2. Le pipeline ETL inversé BigQuery utilise les API Spanner pour charger des données dans une instance Spanner provisionnée.

Le schéma suivant montre les étapes d'un pipeline Reverse ETL :

Diagramme illustrant les trois étapes principales du transfert de données de BigQuery vers Spanner Graph dans un pipeline ETL inversé.

Figure 1 : Processus de pipeline ETL inversé BigQuery

Gérer les modifications des données graphiques

Vous pouvez utiliser l'ETL inversé pour effectuer les opérations suivantes :

  • Charger un ensemble de données graphiques depuis BigQuery vers Spanner Graph.

  • Synchronisez les données Spanner Graph avec les mises à jour continues d'un ensemble de données dans BigQuery.

Vous configurez un pipeline ETL inversé avec une requête SQL pour spécifier les données sources et la transformation à appliquer. Le pipeline charge toutes les données qui satisfont la clause WHERE de l'instruction SELECT dans Spanner à l'aide d'une opération upsert. Une opération d'upsert équivaut à des instructions INSERT OR UPDATE. Il insère de nouvelles lignes et met à jour les lignes existantes dans les tables qui stockent les données graphiques. Le pipeline base les lignes nouvelles et mises à jour sur une clé primaire de table Spanner.

Insérer et mettre à jour des données pour les tables avec des dépendances d'ordre de chargement

Les bonnes pratiques de conception de schéma Spanner Graph recommandent d'utiliser des tables entrelacées et des clés étrangères. Si vous utilisez des tables entrelacées ou des clés étrangères appliquées, vous devez charger les données de nœuds et d'arêtes dans un ordre spécifique. Cela garantit que les lignes référencées existent avant la création de la ligne de référence. Pour en savoir plus, consultez Créer des tables entrelacées.

Le schéma de table d'entrée de graphique suivant utilise une table entrelacée et une contrainte de clé étrangère pour modéliser la relation entre une personne et ses comptes :

CREATE TABLE Person (
  id    INT64 NOT NULL,
  name  STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE Account (
  id           INT64 NOT NULL,
  create_time  TIMESTAMP,
  is_blocked   BOOL,
  type        STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE PersonOwnAccount (
  id           INT64 NOT NULL,
  account_id   INT64 NOT NULL,
  create_time  TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id)
) PRIMARY KEY (id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES (
    Person,
    Account
  )
  EDGE TABLES (
    PersonOwnAccount
      SOURCE KEY (id) REFERENCES Person
      DESTINATION KEY (account_id) REFERENCES Account
      LABEL Owns
  );

Dans cet exemple de schéma, PersonOwnAccount est une table entrelacée dans Person. Chargez les éléments du tableau Person avant ceux du tableau PersonOwnAccount. De plus, la contrainte de clé étrangère sur PersonOwnAccount garantit qu'une ligne correspondante existe dans Account, la cible de la relation d'arête. Par conséquent, chargez la table Account avant la table PersonOwnAccount. La liste suivante récapitule les dépendances de l'ordre de chargement de ce schéma :

Pour charger les données, procédez comme suit :

  1. Chargez Person avant PersonOwnAccount.
  2. Chargez Account avant PersonOwnAccount.

Spanner applique les contraintes d'intégrité référentielle dans l'exemple de schéma. Si le pipeline tente de créer une ligne dans la table PersonOwnAccount sans ligne correspondante dans la table Person ou Account, Spanner renvoie une erreur. Le pipeline échoue alors.

Cet exemple de pipeline ETL inversé utilise des instructions EXPORTDATA dans BigQuery pour exporter les données des tables Person, Account et PersonOwnAccount d'un ensemble de données afin de répondre aux dépendances de l'ordre de chargement :

BEGIN
EXPORT DATA OPTIONS (
    uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "Person",
      "priority": "HIGH",
      "tag" : "graph_data_load_person"
    }"""
  ) AS
  SELECT
    id,
    name
  FROM
    DATASET_NAME.Person;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "Account",
    "priority": "HIGH",
    "tag" : "graph_data_load_account"
  }"""
) AS
SELECT
  id,
  create_time,
  is_blocked,
  type
FROM
  DATASET_NAME.Account;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "PersonOwnAccount",
    "priority": "HIGH",
    "tag" : "graph_data_load_person_own_account"
  }"""
) AS
SELECT
  id,
  account_id,
  create_time
FROM
  DATASET_NAME.PersonOwnAccount;
END;

Synchroniser les données

Pour synchroniser BigQuery avec Spanner Graph, utilisez des pipelines ETL inversés. Vous pouvez configurer un pipeline pour effectuer l'une des opérations suivantes :

  • Appliquez les insertions et les mises à jour de la source BigQuery à la table cible Spanner Graph. Vous pouvez ajouter des éléments de schéma aux tables cibles pour communiquer logiquement les suppressions et supprimer les lignes de table cible selon une programmation.

  • Utilisez une fonction de série temporelle qui applique des opérations d'insertion et de mise à jour, et identifie les opérations de suppression.

Contraintes d'intégrité référentielle

Contrairement à Spanner, BigQuery n'applique pas de contraintes de clé primaire ni de clé étrangère. Si vos données BigQuery ne respectent pas les contraintes que vous créez sur vos tables Spanner, le pipeline ETL inversé peut échouer lors du chargement de ces données.

L'ETL inversé regroupe automatiquement les données en lots qui ne dépassent pas la limite maximale de mutations par commit et applique les lots de manière atomique à une table Spanner dans un ordre arbitraire. Si un lot contient des données qui échouent à un contrôle d'intégrité référentielle, Spanner ne le charge pas. Par exemple, une ligne enfant imbriquée sans ligne parente ou une colonne de clé étrangère forcée sans valeur correspondante dans la colonne référencée. Si un lot échoue à un contrôle, le pipeline échoue avec une erreur et arrête de charger les lots.

Comprendre les erreurs de contrainte d'intégrité référentielle

Vous trouverez ci-dessous des exemples d'erreurs de contrainte d'intégrité référentielle que vous pouvez rencontrer :

Résoudre les erreurs de contrainte de clé étrangère
  • Erreur : "La contrainte de clé étrangère FK_Account est enfreinte dans la table PersonOwnAccount. Impossible de trouver les valeurs référencées dans Account(id)

  • Cause : l'insertion d'une ligne dans la table PersonOwnAccount a échoué, car il manque une ligne correspondante dans la table Account, requise par la clé étrangère FK_Account.

Résoudre les erreurs liées à l'absence de ligne parente
  • Erreur : "La ligne parente de la ligne [15,1] du tableau PersonOwnAccount est manquante"

  • Cause : l'insertion d'une ligne dans PersonOwnAccount (id: 15 et account_id: 1) a échoué, car une ligne parente est manquante dans la table Person (id: 15).

Pour réduire le risque d'erreurs d'intégrité référentielle, envisagez les options suivantes. Chaque option présente des avantages et des inconvénients.

  • Relâchez les contraintes pour permettre à Spanner Graph de charger les données.
  • Ajoutez une logique à votre pipeline pour omettre les lignes qui ne respectent pas les contraintes d'intégrité référentielle.

Relâcher l'intégrité référentielle

Pour éviter les erreurs d'intégrité référentielle lors du chargement des données, vous pouvez assouplir les contraintes afin que Spanner n'applique pas l'intégrité référentielle.

  • Vous pouvez créer des tables entrelacées avec la clause INTERLEAVE IN pour utiliser les mêmes caractéristiques d'entrelacement des lignes physiques. Si vous utilisez INTERLEAVE IN au lieu de INTERLEAVE IN PARENT, Spanner n'applique pas l'intégrité référentielle, mais les requêtes bénéficient de la colocalisation des tables associées.

  • Vous pouvez créer des clés étrangères informatives à l'aide de l'option NOT ENFORCED. L'option NOT ENFORCED offre des avantages en termes d'optimisation des requêtes. Toutefois, Spanner n'applique pas l'intégrité référentielle.

Par exemple, pour créer la table d'entrée des arêtes sans vérification de l'intégrité référentielle, vous pouvez utiliser le LDD suivant :

CREATE TABLE PersonOwnAccount (
  id          INT64 NOT NULL,
  account_id  INT64 NOT NULL,
  create_time TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id) NOT ENFORCED
) PRIMARY KEY (id, account_id),
INTERLEAVE IN Person;

Respecter l'intégrité référentielle dans les pipelines ETL inversés

Pour vous assurer que le pipeline ne charge que les lignes qui satisfont aux vérifications de l'intégrité référentielle, n'incluez que les lignes PersonOwnAccount qui ont des lignes correspondantes dans les tables Person et Account. Ensuite, conservez l'ordre de chargement afin que Spanner charge les lignes Person et Account avant les lignes PersonOwnAccount qui y font référence.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_load_person_own_account"
    }"""
  ) AS
  SELECT
    poa.id,
    poa.account_id,
    poa.create_time
  FROM `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
    JOIN `PROJECT_ID.DATASET_NAME.Person` p ON (poa.id = p.id)
    JOIN `PROJECT_ID.DATASET_NAME.Account` a ON (poa.account_id = a.id)
  WHERE poa.id = p.id
    AND poa.account_id = a.id;

Supprimer des éléments de graphique

Les pipelines ETL inversés utilisent des opérations upsert. Étant donné que les opérations d'upsert sont équivalentes aux instructions INSERT OR UPDATE, un pipeline ne peut synchroniser que les lignes qui existent dans les données sources au moment de l'exécution. Cela signifie que le pipeline exclut les lignes supprimées. Si vous supprimez des données de BigQuery, un pipeline ETL inversé ne peut pas supprimer directement les mêmes données de Spanner Graph.

Vous pouvez utiliser l'une des options suivantes pour gérer les suppressions dans les tables sources BigQuery :

Effectuer une suppression logique ou réversible dans la source

Pour marquer logiquement des lignes à supprimer, utilisez un indicateur de suppression dans BigQuery. Créez ensuite une colonne dans la table Spanner cible dans laquelle vous pourrez propager l'indicateur. Lorsque l'ETL inversé applique les mises à jour du pipeline, supprimez les lignes comportant cet indicateur dans Spanner. Vous pouvez trouver et supprimer explicitement ces lignes à l'aide du LMD partitionné. Vous pouvez également supprimer des lignes de manière implicite en configurant une colonne TTL (Time To Live) avec une date qui dépend de la colonne d'indicateur de suppression. Écrivez des requêtes Spanner pour exclure ces lignes supprimées de manière logique. Spanner peut ainsi exclure ces lignes des résultats avant la suppression planifiée. Une fois le pipeline d'ETL inversé exécuté, Spanner reflète les suppressions logiques dans ses lignes. Vous pouvez ensuite supprimer des lignes de BigQuery.

Cet exemple ajoute une colonne is_deleted à la table PersonOwnAccount dans Spanner. Il ajoute ensuite une colonne expired_ts_generated qui dépend de la valeur is_deleted. La règle TTL planifie la suppression des lignes concernées, car la date de la colonne générée est antérieure au seuil DELETION POLICY.

ALTER TABLE PersonOwnAccount
  ADD COLUMN is_deleted BOOL DEFAULT (FALSE);

ALTER TABLE PersonOwnAccount ADD COLUMN
  expired_ts_generated TIMESTAMP AS (IF(is_deleted,
    TIMESTAMP("1970-01-01 00:00:00+00"),
    TIMESTAMP("9999-01-01 00:00:00+00"))) STORED HIDDEN;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts_generated, INTERVAL 0 DAY));

Utiliser l'historique des modifications BigQuery pour les opérations INSERT, UPDATE et les suppressions logiques

Vous pouvez suivre les modifications apportées à une table BigQuery à l'aide de son historique des modifications. Utilisez la fonction GoogleSQL CHANGES pour trouver les lignes qui ont été modifiées au cours d'un intervalle de temps spécifique. Utilisez ensuite les informations sur les lignes supprimées avec un pipeline ETL inversé. Vous pouvez configurer le pipeline pour définir un indicateur, tel qu'un indicateur de suppression ou une date d'expiration, dans la table Spanner. Cet indicateur marque les lignes à supprimer dans les tables Spanner.

Utilisez les résultats de la fonction de série temporelle CHANGES pour déterminer les lignes de la table source à inclure dans le chargement de votre pipeline d'ETL inversé.

Le pipeline inclut les lignes avec _CHANGE_TYPE comme INSERT ou UPDATE comme upserts si la ligne existe dans la table source. La ligne actuelle du tableau source fournit les données les plus récentes.

Utilisez les lignes avec _CHANGE_TYPE comme DELETE qui n'ont pas de lignes existantes dans la table source pour définir un indicateur dans la table Spanner, tel qu'un indicateur de suppression ou une date d'expiration de ligne.

Votre requête d'exportation doit tenir compte de l'ordre des insertions et des suppressions dans BigQuery. Prenons l'exemple d'une ligne supprimée à l'heure T1 et d'une nouvelle ligne insérée à l'heure T2. Si les deux événements correspondent à la même ligne de table Spanner, l'exportation doit préserver les effets de ces événements dans leur ordre d'origine.

Si elle est définie, l'indicateur delete marque les lignes à supprimer dans les tables Spanner.

Par exemple, vous pouvez ajouter une colonne à une table d'entrée Spanner pour stocker la date d'expiration de chaque ligne. Créez ensuite une règle de suppression qui utilise ces dates d'expiration.

L'exemple suivant montre comment ajouter une colonne pour stocker les dates d'expiration des lignes de la table.

ALTER TABLE PersonOwnAccount ADD COLUMN expired_ts TIMESTAMP;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts, INTERVAL 1 DAY));

Pour utiliser la fonction CHANGES sur une table dans BigQuery, définissez l'option enable_change_history de la table sur TRUE :

ALTER TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`
  SET OPTIONS (enable_change_history=TRUE);

L'exemple suivant montre comment utiliser l'ETL inversé pour mettre à jour les lignes nouvelles ou modifiées, et définir la date d'expiration des lignes marquées pour suppression. Une jointure à gauche avec la table PersonOwnAccount fournit à la requête des informations sur l'état actuel de chaque ligne.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_delete_via_reverse_etl"
    }"""
  ) AS
SELECT
  DISTINCT
   IF (changes._CHANGE_TYPE = 'DELETE', changes.id, poa.id) AS id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.account_id, poa.account_id) AS account_id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.create_time, poa.create_time) AS create_time,
   IF (changes._CHANGE_TYPE = 'DELETE', changes._CHANGE_TIMESTAMP, NULL) AS expired_ts
FROM
  CHANGES(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
    TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), DAY),
    TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)) changes
LEFT JOIN `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
  ON (poa.id = changes.id
  AND poa.account_id = changes.account_id)
WHERE (changes._CHANGE_TYPE = 'DELETE'
   AND poa.id IS NULL)
   OR (changes._CHANGE_TYPE IN ( 'UPDATE', 'INSERT')
   AND poa.id IS NOT NULL );

L'exemple de requête utilise un LEFT JOIN avec la table source pour préserver l'ordre. Cette jointure garantit que les enregistrements de modifications DELETE sont ignorés pour les lignes supprimées, puis recréées dans l'intervalle de l'historique des modifications de la requête. Le pipeline conserve la nouvelle ligne valide.

Lorsque vous supprimez des lignes, le pipeline remplit la colonne expired_ts de la ligne Spanner Graph correspondante à l'aide du code temporel DELETE de la colonne _CHANGE_TIMESTAMP. Dans Spanner, une règle de suppression de lignes (règle TTL) supprime toute ligne dont la valeur expired_ts est antérieure de plus d'un jour.

Pour garantir la fiabilité du système, coordonnez la programmation du pipeline, la période d'analyse des modifications et la règle TTL Spanner. Programmez le pipeline pour qu'il s'exécute tous les jours. La durée de la règle TTL Spanner doit être supérieure à cet intervalle d'exécution. Cela empêche le pipeline de retraiter un événement DELETE précédent pour une ligne déjà supprimée par la règle TTL Spanner.

Cet exemple montre l'intervalle start_timestamp et end_timestamp pour les requêtes quotidiennes qui capturent toutes les modifications apportées aux tables BigQuery depuis la veille (heure UTC). Comme il s'agit d'une requête par lot et que la fonction CHANGES présente des limites, la valeur end_timestamp doit être définie au moins 10 minutes avant l'heure actuelle. Par conséquent, planifiez l'exécution de cette requête au moins 10 minutes après minuit UTC. Pour en savoir plus, consultez la documentation CHANGES.

Utiliser des colonnes TTL avec le code temporel de la dernière consultation

Un pipeline ETL inversé définit une colonne last_seen_ts sur le code temporel actuel pour chaque ligne de la table Spanner. Lorsque vous supprimez des lignes BigQuery, Spanner ne met pas à jour les lignes correspondantes et la colonne last_seen_ts ne change pas. Spanner supprime ensuite les lignes dont la valeur last_seen_ts est obsolète à l'aide d'une règle TTL ou d'un LMD partitionné, en fonction d'un seuil défini. Avant la suppression planifiée, les requêtes Spanner peuvent filtrer les lignes dont la valeur last_seen_ts est antérieure à ce seuil. Cette approche fonctionne efficacement lorsque les données du graphique sont régulièrement mises à jour et que les mises à jour manquantes indiquent que les données sont obsolètes et doivent être supprimées.

Effectuer une actualisation complète

Avant de charger des données depuis BigQuery, vous pouvez supprimer des tables Spanner pour refléter les suppressions dans les tables sources. Cela empêche le pipeline de charger dans Spanner les lignes supprimées des tables BigQuery sources lors de la prochaine exécution du pipeline. Il s'agit peut-être de l'option la plus simple à implémenter. Toutefois, tenez compte du temps nécessaire pour recharger complètement vos données de graphique.

Gérer un pipeline ETL inversé par lot planifié

Une fois que l'exécution initiale de votre pipeline Reverse ETL a chargé les données de BigQuery dans Spanner Graph, les données réelles continuent de changer. Les ensembles de données changent, et le pipeline ajoute ou supprime des éléments de graphique au fil du temps. Le pipeline découvre de nouveaux nœuds et ajoute de nouvelles relations d'arête, ou l'inférence de l'IA les génère.

Pour que la base de données Spanner Graph reste à jour, planifiez et séquencez l'orchestration des pipelines BigQuery à l'aide de l'une des options suivantes :

BigQuery Pipelines vous permet de développer, de tester, de contrôler les versions et de déployer des workflows complexes de transformation de données SQL dans BigQuery. Il gère nativement les dépendances d'ordre en vous permettant de définir des relations entre les requêtes de votre pipeline. Dataform crée un arbre de dépendances et exécute vos requêtes dans le bon ordre. Cela garantit que les dépendances en amont sont terminées avant le début des tâches en aval.

Les workflows appelés par Cloud Scheduler constituent une solution utile et flexible pour orchestrer des séquences de servicesGoogle Cloud , y compris des requêtes BigQuery. Définissez un workflow comme une série d'étapes exécutant chacune un job BigQuery. Vous pouvez utiliser Cloud Scheduler pour appeler ces workflows selon un calendrier défini. Gérez les dépendances à l'aide de la définition du workflow pour spécifier l'ordre d'exécution, implémenter une logique conditionnelle, gérer les erreurs et transmettre les sorties d'une requête à une autre.

Les requêtes programmées, également appelées jobs de transfert BigQuery, vous permettent d'exécuter des instructions SQL de manière récurrente dans BigQuery. Les requêtes planifiées n'offrent pas de gestion robuste des erreurs ni de gestion dynamique des dépendances.

ETL inversé avec les requêtes continues BigQuery

La fonctionnalité Requêtes continues BigQuery vous permet d'exécuter des opérations BigQuery quasiment en temps réel. La combinaison de EXPORT DATA et des requêtes continues offre une autre méthode pour exécuter des pipelines ETL inversés qui évite les jobs par lot planifiés.

Une requête continue est une requête de longue durée qui surveille une table source BigQuery pour détecter les nouvelles lignes. Lorsque BigQuery détecte de nouvelles lignes ajoutées à la table, il diffuse les résultats de la requête vers l'opération EXPORT DATA.

Cette approche offre les avantages suivants :

  • Synchronisation des données en temps quasi réel : les nouvelles lignes de BigQuery sont reflétées dans Spanner avec un délai minimal.

  • Réduction des frais généraux de traitement par lot : une requête continue élimine le besoin de jobs par lot périodiques, ce qui réduit les frais généraux de calcul.

  • Mises à jour déclenchées par des événements : les données Spanner sont mises à jour en fonction des modifications réelles dans BigQuery.

Un pipeline de requête continue nécessite une attribution de réservation d'emplacement avec job_type défini sur CONTINUOUS. Attribuez ce rôle au niveau du projet ou du dossier, ou au niveau de l'organisation.

Créer une requête continue avec ETL inversé de BigQuery vers Spanner

Configurez le paramètre start_timestamp de la fonction APPENDS pour commencer à traiter les données là où le chargement par lot s'est arrêté. Cette fonction capture toutes les lignes créées dans la période spécifique. Dans l'exemple suivant, le pipeline définit arbitrairement le point de départ sur 10 minutes avant le CURRENT_TIME. Ce code temporel doit se trouver dans la période de déplacement temporel BigQuery.

Il existe plusieurs méthodes pour démarrer un pipeline de requête continue, y compris les suivantes :

  1. Dans BigQuery Studio, en sélectionnant Plus, puis Requête continue sous Choisir le mode de requête.

  2. Utilisez la CLI bq et fournissez l'option --continuous=true.

EXPORT DATA OPTIONS ( uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format="CLOUD_SPANNER",
  spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag": "reverse-etl-continuous",
      "change_timestamp_column": "create_time"
   }"""
)
AS SELECT id, account_id, _CHANGE_TIMESTAMP as create_time
  FROM
APPENDS(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
  CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE )

L'ordre de chargement n'est pas garanti

Les données Spanner Graph sont composées de plusieurs tables d'entrée. Vous devez respecter un ordre de chargement strict lorsque les tables comportent des contraintes d'intégrité référentielle. Toutefois, les requêtes continues simultanées ne peuvent pas contrôler l'ordre dans lequel Spanner ajoute des lignes. Par conséquent, le chargement des données Spanner Graph à l'aide de requêtes continues n'est possible que pour les schémas de graphes avec des contraintes d'intégrité référentielle assouplies.

Intégrer aux pipelines existants

Les requêtes continues complètent les jobs par lot programmés existants. Par exemple, utilisez une requête continue pour les mises à jour quasiment en temps réel et des tâches planifiées pour la synchronisation ou la réconciliation complète des données.

Utilisez une requête continue BigQuery pour créer des pipelines d'ETL inversé réactifs et à jour afin de synchroniser les données entre BigQuery et Spanner Graph.

Considérations relatives aux requêtes continues

  • Coût : les requêtes continues entraînent des coûts pour l'exécution continue des requêtes et le streaming de données.

  • Gestion des erreurs : un pipeline de requête continue est annulé s'il rencontre des erreurs de base de données, comme une clé primaire en double ou une violation de l'intégrité référentielle. Si un pipeline échoue, vous devez corriger manuellement les données de la table BigQuery source avant de relancer la requête.

  • Suppression et mise à jour non gérées : la fonction APPENDS ne capture que les insertions. Il ne capture pas les suppressions ni les modifications.

Suivre les bonnes pratiques de l'ETL inversé

Pour obtenir les meilleurs résultats possible, procédez comme suit.

  • Choisissez une stratégie pour éviter les erreurs d'intégrité référentielle lorsque vous chargez des données périphériques.

  • Concevez votre pipeline de données global pour éviter les arêtes isolées. Les arêtes isolées peuvent compromettre l'efficacité des requêtes Spanner Graph et l'intégrité de la structure du graphique. Pour en savoir plus, consultez Éviter les arêtes orphelines.

  • Suivez les recommandations d'optimisation de l'exportation Spanner.

  • Si vous chargez une grande quantité de données, envisagez de diviser le pipeline en plusieurs pipelines plus petits pour éviter d'atteindre le quota par défaut de six heures de temps d'exécution des requêtes BigQuery. Pour en savoir plus, consultez Limites des jobs de requête BigQuery.

  • Pour les chargements de données volumineux, ajoutez des index et des contraintes de clé étrangère une fois le chargement groupé initial des données terminé. Cette pratique améliore les performances de chargement des données, car les contraintes de clé étrangère nécessitent des lectures supplémentaires pour la validation et les index nécessitent des écritures supplémentaires. Ces opérations augmentent le nombre de participants aux transactions, ce qui peut ralentir le processus de chargement des données.

  • Activez l'autoscaling dans Spanner pour accélérer les temps de chargement des données dans une instance. Configurez ensuite le paramètre priority de Spanner dans la section spanner_options de la commande EXPORT DATA BigQuery sur HIGH. Pour en savoir plus, consultez Présentation de l'autoscaling Spanner, Configurer les exportations avec l'option spanner_options et RequestOptions.priority.

  • Pour les chargements de données volumineux, créez des points de fractionnement afin de fractionner votre base de données au préalable. Cela prépare Spanner à un débit accru.

  • Configurez la priorité des requêtes Spanner pour le chargement des données dans la définition du pipeline.

Étapes suivantes