Usar ETL inverso para cargar datos de BigQuery en Spanner Graph

En este documento se describe cómo usar las canalizaciones de extracción, transformación y carga (ETL) inversas para mover y sincronizar continuamente datos de grafos de BigQuery a Spanner Graph. Abarca los siguientes aspectos clave:

Para usar la ETL inversa y exportar datos de BigQuery a Spanner, consulta Exportar datos a Spanner.

BigQuery realiza manipulaciones de datos complejas a gran escala como plataforma de procesamiento analítico, mientras que Spanner está optimizado para casos prácticos que requieren un alto QPS y una latencia de servicio baja. Spanner Graph y BigQuery se integran de forma eficaz para preparar datos de grafos en las canalizaciones de analíticas de BigQuery, lo que permite a Spanner ofrecer recorridos de grafos de baja latencia.

Antes de empezar

  1. Crea una instancia de Spanner con una base de datos que contenga datos de gráficos. Para obtener más información, consulta Configurar y consultar Spanner Graph.

  2. En BigQuery, crea una reserva de ranuras de nivel Enterprise o Enterprise Plus. Puedes reducir los costes de computación de BigQuery cuando ejecutas exportaciones a Spanner Graph. Para ello, define una capacidad de cero para el espacio publicitario de referencia y habilita el autoescalado.

  3. Concede roles de gestión de identidades y accesos (IAM) que proporcionen a los usuarios los permisos necesarios para realizar cada tarea de este documento.

Roles obligatorios

Para obtener los permisos que necesitas para exportar datos de gráficos de BigQuery a Spanner Graph, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos en tu proyecto:

Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Casos prácticos de ETL inverso

A continuación, se muestran algunos ejemplos de casos prácticos. Después de analizar y procesar los datos en BigQuery, puedes moverlos a Spanner Graph mediante ETL inverso.

Agregación y resumen de datos: usa BigQuery para calcular agregaciones de datos granulares y hacerlos más adecuados para casos prácticos operativos.

Transformación y enriquecimiento de datos: usa BigQuery para limpiar y estandarizar los datos recibidos de diferentes fuentes.

Filtrado y selección de datos: usa BigQuery para filtrar un conjunto de datos grande con fines analíticos. Por ejemplo, puede excluir los datos que no sean necesarios para las aplicaciones en tiempo real.

Preprocesamiento y creación de funciones: en BigQuery, usa la función ML.TRANSFORM para transformar datos o la función ML.FEATURE_CROSS para crear combinaciones de funciones de funciones de entrada. A continuación, usa la ETL inversa para mover los datos resultantes a Spanner Graph.

Información sobre el flujo de procesamiento de ETL inverso

Los datos se mueven de BigQuery a Spanner Graph en un flujo de trabajo de ETL inverso en dos pasos:

  1. BigQuery usa slots asignados a la tarea de la canalización para extraer y transformar los datos de origen.

  2. El flujo de procesamiento de ETL inverso de BigQuery usa APIs de Spanner para cargar datos en una instancia de Spanner aprovisionada.

En el siguiente diagrama se muestran los pasos de una canalización de ETL inversa:

Diagrama que muestra los tres pasos principales cuando los datos se transfieren de BigQuery a Spanner Graph en una canalización de ETL inversa.

Imagen 1. Proceso de la canalización de ETL inversa de BigQuery

Gestionar los cambios en los datos de gráficos

Puedes usar ETL inverso para lo siguiente:

  • Carga un conjunto de datos de grafos de BigQuery en Spanner Graph.

  • Sincroniza los datos de gráficos de Spanner con las actualizaciones continuas de un conjunto de datos de BigQuery.

Para configurar una canalización de ETL inversa, debes usar una consulta SQL para especificar los datos de origen y la transformación que se va a aplicar. La canalización carga en Spanner todos los datos que cumplen la cláusula WHERE de la instrucción SELECT mediante una operación de inserción o actualización. Una operación de upsert equivale a las instrucciones INSERT OR UPDATE. Inserta filas nuevas y actualiza las filas de las tablas que almacenan datos de gráficos. La canalización basa las filas nuevas y actualizadas en una clave principal de tabla de Spanner.

Insertar y actualizar datos de tablas con dependencias de orden de carga

En las prácticas recomendadas para el diseño de esquemas de Spanner Graph se recomienda usar tablas intercaladas y claves externas. Si usas tablas intercaladas o claves externas obligatorias, debes cargar los datos de nodos y aristas en un orden específico. De esta forma, te aseguras de que las filas a las que se hace referencia existan antes de crear la fila que hace referencia. Para obtener más información, consulta Crear tablas intercaladas.

El siguiente ejemplo de esquema de tabla de entrada de un gráfico usa una tabla intercalada y una restricción de clave externa para modelar la relación entre una persona y sus cuentas:

CREATE TABLE Person (
  id    INT64 NOT NULL,
  name  STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE Account (
  id           INT64 NOT NULL,
  create_time  TIMESTAMP,
  is_blocked   BOOL,
  type        STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE PersonOwnAccount (
  id           INT64 NOT NULL,
  account_id   INT64 NOT NULL,
  create_time  TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id)
) PRIMARY KEY (id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES (
    Person,
    Account
  )
  EDGE TABLES (
    PersonOwnAccount
      SOURCE KEY (id) REFERENCES Person
      DESTINATION KEY (account_id) REFERENCES Account
      LABEL Owns
  );

En este esquema de ejemplo, PersonOwnAccount es una tabla intercalada en Person. Carga los elementos de la tabla Person antes que los de la tabla PersonOwnAccount. Además, la restricción de clave externa en PersonOwnAccount asegura que haya una fila coincidente en Account, el destino de la relación de arista. Por lo tanto, carga la tabla Account antes que la tabla PersonOwnAccount. En la siguiente lista se resumen las dependencias del orden de carga de este esquema:

Sigue estos pasos para cargar los datos:

  1. Carga Person antes de PersonOwnAccount.
  2. Carga Account antes de PersonOwnAccount.

Spanner aplica las restricciones de integridad referencial en el esquema de ejemplo. Si la canalización intenta crear una fila en la tabla PersonOwnAccount sin una fila coincidente en la tabla Person o en la tabla Account, Spanner devuelve un error. A continuación, el flujo de procesamiento falla.

En este ejemplo de flujo de procesamiento ETL inverso se usan EXPORTDATA instrucciones en BigQuery para exportar datos de las tablas Person, Account y PersonOwnAccount de un conjunto de datos para cumplir las dependencias del orden de carga:

BEGIN
EXPORT DATA OPTIONS (
    uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "Person",
      "priority": "HIGH",
      "tag" : "graph_data_load_person"
    }"""
  ) AS
  SELECT
    id,
    name
  FROM
    DATASET_NAME.Person;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "Account",
    "priority": "HIGH",
    "tag" : "graph_data_load_account"
  }"""
) AS
SELECT
  id,
  create_time,
  is_blocked,
  type
FROM
  DATASET_NAME.Account;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "PersonOwnAccount",
    "priority": "HIGH",
    "tag" : "graph_data_load_person_own_account"
  }"""
) AS
SELECT
  id,
  account_id,
  create_time
FROM
  DATASET_NAME.PersonOwnAccount;
END;

Sincronizar datos

Para sincronizar BigQuery con Spanner Graph, usa flujos de procesamiento de ETL inversos. Puede configurar una canalización para que haga lo siguiente:

  • Aplica las inserciones y actualizaciones de la fuente de BigQuery a la tabla de destino de Spanner Graph. Puedes añadir elementos de esquema a las tablas de destino para comunicar las eliminaciones de forma lógica y quitar las filas de las tablas de destino según una programación.

  • Usa una función de serie temporal que aplique operaciones de inserción y actualización, e identifique las operaciones de eliminación.

Restricciones de integridad referencial

A diferencia de Spanner, BigQuery no aplica restricciones de clave principal ni de clave externa. Si los datos de BigQuery no cumplen las restricciones que creas en tus tablas de Spanner, es posible que la canalización de ETL inverso falle al cargar esos datos.

Reverse ETL agrupa automáticamente los datos en lotes que no superan el límite máximo de mutaciones por confirmación y aplica los lotes de forma atómica a una tabla de Spanner en un orden arbitrario. Si un lote contiene datos que no superan una comprobación de integridad referencial, Spanner no carga ese lote. Algunos ejemplos de estos errores son una fila secundaria intercalada que no tiene una fila principal o una columna de clave externa obligatoria sin un valor coincidente en la columna de referencia. Si un lote no supera una comprobación, la canalización falla y se detiene la carga de lotes.

Información sobre los errores de restricción de integridad referencial

En los siguientes ejemplos se muestran errores de restricción de integridad referencial que pueden producirse:

Resolver errores de restricción de clave externa
  • Error: "Se ha infringido la restricción de clave externa FK_Account en la tabla PersonOwnAccount. No se encuentran los valores de referencia en Account(id)"

  • Causa: No se ha podido insertar una fila en la tabla PersonOwnAccount porque falta una fila coincidente en la tabla Account, que requiere la clave externa FK_Account.

Resolver errores por falta de fila principal
  • Error: "Falta la fila principal de la fila [15,1] de la tabla PersonOwnAccount"

  • Causa: No se ha podido insertar una fila en PersonOwnAccount (id: 15 y account_id: 1) porque falta una fila principal en la tabla Person (id: 15).

Para reducir el riesgo de que se produzcan errores de integridad referencial, tenga en cuenta las siguientes opciones. Cada opción tiene sus pros y sus contras.

  • Relaja las restricciones para permitir que Spanner Graph cargue datos.
  • Añade lógica a tu canalización para omitir las filas que infrinjan las restricciones de integridad referencial.

Relajar la integridad referencial

Una opción para evitar errores de integridad referencial al cargar datos es relajar las restricciones para que Spanner no aplique la integridad referencial.

  • Puedes crear tablas intercaladas con la cláusula INTERLEAVE IN para usar las mismas características de intercalación de filas físicas. Si usas INTERLEAVE IN en lugar de INTERLEAVE IN PARENT, Spanner no aplica la integridad referencial, aunque las consultas se benefician de la ubicación conjunta de las tablas relacionadas.

  • Puede crear claves externas informativas con la opción NOT ENFORCED. La opción NOT ENFORCED ofrece ventajas de optimización de consultas. Sin embargo, Spanner no aplica la integridad referencial.

Por ejemplo, para crear la tabla de entrada de aristas sin comprobaciones de integridad referencial, puedes usar este DDL:

CREATE TABLE PersonOwnAccount (
  id          INT64 NOT NULL,
  account_id  INT64 NOT NULL,
  create_time TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id) NOT ENFORCED
) PRIMARY KEY (id, account_id),
INTERLEAVE IN Person;

Respetar la integridad referencial en los flujos de ETL inversos

Para asegurarse de que la canalización solo carga las filas que cumplen las comprobaciones de integridad referencial, incluya solo las filas PersonOwnAccount que tengan filas coincidentes en las tablas Person y Account. Después, conserva el orden de carga para que Spanner cargue las filas Person y Account antes que las filas PersonOwnAccount que hacen referencia a ellas.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_load_person_own_account"
    }"""
  ) AS
  SELECT
    poa.id,
    poa.account_id,
    poa.create_time
  FROM `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
    JOIN `PROJECT_ID.DATASET_NAME.Person` p ON (poa.id = p.id)
    JOIN `PROJECT_ID.DATASET_NAME.Account` a ON (poa.account_id = a.id)
  WHERE poa.id = p.id
    AND poa.account_id = a.id;

Eliminar elementos de un gráfico

Los flujos de procesamiento de extracción, transformación y carga (ETL) inversos usan operaciones de inserción o actualización. Como las operaciones de inserción y actualización son equivalentes a las instrucciones INSERT OR UPDATE, una canalización solo puede sincronizar las filas que existen en los datos de origen en tiempo de ejecución. Esto significa que la canalización excluye las filas eliminadas. Si eliminas datos de BigQuery, una canalización de ETL inversa no puede eliminar directamente los mismos datos de Spanner Graph.

Puede usar una de las siguientes opciones para gestionar las eliminaciones de las tablas de origen de BigQuery:

Realizar una eliminación lógica o suave en la fuente

Para marcar lógicamente las filas que se van a eliminar, usa una marca de eliminación en BigQuery. A continuación, cree una columna en la tabla de Spanner de destino en la que pueda propagar la marca. Cuando la ETL inversa aplique las actualizaciones de la canalización, elimina las filas que tengan esta marca en Spanner. Puedes buscar y eliminar estas filas explícitamente mediante DML particionado. También puedes eliminar filas de forma implícita configurando una columna TTL (tiempo de vida) con una fecha que dependa de la columna de marca de eliminación. Escribe consultas de Spanner para excluir estas filas eliminadas de forma lógica. De esta forma, Spanner excluye estas filas de los resultados antes de la eliminación programada. Una vez que se haya completado la canalización de ETL inversa, Spanner reflejará las eliminaciones lógicas en sus filas. Después, puedes eliminar filas de BigQuery.

En este ejemplo se añade una columna is_deleted a la tabla PersonOwnAccount de Spanner. A continuación, añade una columna expired_ts_generated que depende del valor is_deleted. La política de TTL programa la eliminación de las filas afectadas porque la fecha de la columna generada es anterior al umbral de DELETION POLICY.

ALTER TABLE PersonOwnAccount
  ADD COLUMN is_deleted BOOL DEFAULT (FALSE);

ALTER TABLE PersonOwnAccount ADD COLUMN
  expired_ts_generated TIMESTAMP AS (IF(is_deleted,
    TIMESTAMP("1970-01-01 00:00:00+00"),
    TIMESTAMP("9999-01-01 00:00:00+00"))) STORED HIDDEN;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts_generated, INTERVAL 0 DAY));

Usar el historial de cambios de BigQuery para las inserciones, las actualizaciones y las eliminaciones lógicas

Puedes hacer un seguimiento de los cambios en una tabla de BigQuery mediante su historial de cambios. Usa la función de GoogleSQL CHANGES para buscar las filas que han cambiado en un intervalo de tiempo específico. A continuación, usa la información de la fila eliminada con un flujo de procesamiento de ETL inverso. Puedes configurar la pipeline para definir un indicador, como una marca de eliminación o una fecha de vencimiento, en la tabla de Spanner. Este indicador marca las filas que se van a eliminar en las tablas de Spanner.

Usa los resultados de la función de serie temporal CHANGES para decidir qué filas de la tabla de origen quieres incluir en la carga de tu canalización de ETL inversa.

El flujo de procesamiento incluye filas con _CHANGE_TYPE como INSERT o UPDATE como inserciones si la fila existe en la tabla de origen. La fila actual de la tabla de origen proporciona los datos más recientes.

Usa las filas con _CHANGE_TYPE como DELETE que no tengan filas en la tabla de origen para definir un indicador en la tabla de Spanner, como una marca de eliminación o una fecha de vencimiento de la fila.

Tu consulta de exportación debe tener en cuenta el orden de las inserciones y eliminaciones en BigQuery. Por ejemplo, supongamos que se elimina una fila en el momento T1 y se inserta una fila nueva en un momento posterior T2. Si ambos se asignan a la misma fila de la tabla de Spanner, la exportación debe conservar los efectos de estos eventos en su orden original.

Si se define, el indicador delete marca las filas que se van a eliminar en las tablas de Spanner.

Por ejemplo, puedes añadir una columna a una tabla de entrada de Spanner para almacenar la fecha de vencimiento de cada fila. A continuación, crea una política de eliminación que utilice estas fechas de vencimiento.

En el siguiente ejemplo se muestra cómo añadir una columna para almacenar las fechas de vencimiento de las filas de la tabla.

ALTER TABLE PersonOwnAccount ADD COLUMN expired_ts TIMESTAMP;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts, INTERVAL 1 DAY));

Para usar la función CHANGES en una tabla de BigQuery, define la opción enable_change_history de la tabla en TRUE:

ALTER TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`
  SET OPTIONS (enable_change_history=TRUE);

En el siguiente ejemplo se muestra cómo puedes usar el ETL inverso para actualizar las filas nuevas o modificadas y definir la fecha de vencimiento de las filas marcadas para eliminar. Una combinación externa izquierda con la tabla PersonOwnAccount proporciona a la consulta información sobre el estado actual de cada fila.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_delete_via_reverse_etl"
    }"""
  ) AS
SELECT
  DISTINCT
   IF (changes._CHANGE_TYPE = 'DELETE', changes.id, poa.id) AS id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.account_id, poa.account_id) AS account_id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.create_time, poa.create_time) AS create_time,
   IF (changes._CHANGE_TYPE = 'DELETE', changes._CHANGE_TIMESTAMP, NULL) AS expired_ts
FROM
  CHANGES(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
    TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), DAY),
    TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)) changes
LEFT JOIN `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
  ON (poa.id = changes.id
  AND poa.account_id = changes.account_id)
WHERE (changes._CHANGE_TYPE = 'DELETE'
   AND poa.id IS NULL)
   OR (changes._CHANGE_TYPE IN ( 'UPDATE', 'INSERT')
   AND poa.id IS NOT NULL );

La consulta de ejemplo usa un LEFT JOIN con la tabla de origen para conservar el orden. Esta combinación asegura que se ignoren los registros de cambios de DELETE en las filas eliminadas y, a continuación, recreadas en el intervalo del historial de cambios de la consulta. La canalización conserva la fila nueva válida.

Cuando eliminas filas, la canalización rellena la columna expired_ts de la fila de gráfico de Spanner correspondiente con la marca de tiempo DELETE de la columna _CHANGE_TIMESTAMP. Una política de eliminación de filas (política de TTL) en Spanner elimina cualquier fila en la que el valor de expired_ts sea de hace más de un día.

Para asegurar la fiabilidad del sistema, coordina la programación de la canalización, el periodo retrospectivo de los cambios y la política de TTL de Spanner. Programa la pipeline para que se ejecute a diario. La política de TTL de Spanner debe tener una duración mayor que este intervalo de ejecución. De esta forma, se evita que la canalización vuelva a procesar un evento DELETE anterior de una fila que ya se ha eliminado mediante la política de TTL de Spanner.

En este ejemplo se muestra el intervalo start_timestamp y end_timestamp de las consultas diarias que registran todos los cambios de la tabla de BigQuery del día anterior en UTC. Como se trata de una consulta por lotes y la función CHANGES tiene limitaciones, el valor de end_timestamp debe ser al menos 10 minutos anterior a la hora actual. Por lo tanto, programa esta consulta para que se ejecute al menos 10 minutos después de la medianoche UTC. Para obtener más información, consulta la documentación de CHANGES.

Usar columnas TTL con la marca de tiempo de la última vez que se vio

Una canalización de ETL inversa asigna a la columna last_seen_ts la marca de tiempo actual de cada fila de la tabla de Spanner. Cuando eliminas filas de BigQuery, Spanner no actualiza las filas correspondientes y la columna last_seen_ts no cambia. A continuación, Spanner elimina las filas con un last_seen_ts obsoleto mediante una política de TTL o DML particionado, en función de un umbral definido. Antes de la eliminación programada, las consultas de Spanner pueden filtrar las filas con un last_seen_ts anterior a este umbral. Este enfoque funciona de forma eficaz cuando los datos de los gráficos se actualizan con frecuencia y las actualizaciones que faltan indican que los datos están obsoletos y se deben eliminar.

Realizar una actualización completa

Antes de cargar datos de BigQuery, puede eliminar tablas de Spanner para que se reflejen las eliminaciones en las tablas de origen. De esta forma, se evita que el flujo de procesamiento cargue en Spanner las filas eliminadas de las tablas de BigQuery de origen durante la siguiente ejecución del flujo de procesamiento. Esta puede ser la opción más sencilla de implementar. Sin embargo, ten en cuenta el tiempo necesario para volver a cargar por completo los datos del gráfico.

Mantener un flujo de procesamiento de extracción, transformación y carga (ETL) inverso por lotes programado

Después de la ejecución inicial de tu flujo de procesamiento de datos de ETL inverso, se cargan datos de forma masiva de BigQuery en Spanner Graph. Los datos del mundo real siguen cambiando. Los conjuntos de datos cambian y la canalización añade o quita elementos de gráfico con el tiempo. El flujo de procesamiento descubre nuevos nodos y añade nuevas relaciones de arista, o bien la inferencia de la IA los genera.

Para que la base de datos de grafos de Spanner esté siempre actualizada, programa y secuencia la orquestación de la canalización de BigQuery con una de las siguientes opciones:

Flujos de procesamiento de BigQuery te permiten desarrollar, probar, controlar versiones y desplegar flujos de trabajo complejos de transformación de datos de SQL en BigQuery. Gestiona de forma nativa las dependencias de orden, ya que te permite definir relaciones entre las consultas de tu canalización. Dataform crea un árbol de dependencias y ejecuta tus consultas en el orden correcto. De esta forma, las dependencias upstream se completan antes de que empiecen las tareas downstream.

Los flujos de trabajo invocados por Cloud Scheduler proporcionan una solución útil y flexible para orquestar secuencias de Google Cloud servicios, incluidas las consultas de BigQuery. Define un flujo de trabajo como una serie de pasos en los que se ejecuta una tarea de BigQuery. Puedes usar Cloud Scheduler para invocar estos flujos de trabajo según una programación definida. Gestiona las dependencias mediante la definición del flujo de trabajo para especificar el orden de ejecución, implementar la lógica condicional, gestionar los errores y transferir las salidas de una consulta a otra.

Las consultas programadas, también conocidas como tareas de transferencia de BigQuery, te permiten ejecutar instrucciones SQL de forma periódica en BigQuery. Las consultas programadas no ofrecen una gestión de errores sólida ni una gestión de dependencias dinámica.

ETL inversa con consultas continuas de BigQuery

La función Consultas continuas de BigQuery te permite ejecutar operaciones de BigQuery casi en tiempo real. La combinación de EXPORT DATA con consultas continuas proporciona un método alternativo para ejecutar flujos de trabajo de ETL inverso que evita las tareas por lotes programadas.

Una consulta continua es una consulta de larga duración que monitoriza una tabla de origen de BigQuery para detectar nuevas filas. Cuando BigQuery detecta que se han añadido filas a la tabla, transmite los resultados de la consulta a la operación EXPORT DATA.

Este enfoque ofrece las siguientes ventajas:

  • Sincronización de datos casi en tiempo real: las filas nuevas de BigQuery se reflejan en Spanner con un retraso mínimo.

  • Menor sobrecarga de procesamiento por lotes: una consulta continua elimina la necesidad de realizar trabajos por lotes periódicos, lo que reduce la sobrecarga computacional.

  • Actualizaciones basadas en eventos: los datos de Spanner se actualizan en respuesta a los cambios reales en BigQuery.

Una canalización de consultas continuas requiere una asignación de reserva de ranuras con el job_type de CONTINUOUS. Asigna este rol a nivel de proyecto o carpeta, o bien a nivel de organización.

Crear una consulta continua con ETL inverso de BigQuery a Spanner

Configura el parámetro start_timestamp de la función APPENDS para empezar a procesar los datos en el punto en el que se interrumpió la carga por lotes. Esta función registra todas las filas creadas en el periodo específico. En el siguiente ejemplo, la canalización define arbitrariamente el punto de inicio 10 minutos antes de CURRENT_TIME. Esta marca de tiempo debe estar dentro del periodo de retroceso en el tiempo de BigQuery.

Hay varios métodos para iniciar una canalización de consultas continuas, entre los que se incluyen los siguientes:

  1. En BigQuery Studio, selecciona Más y elige Consulta continua en Elegir modo de consulta.

  2. Usa la CLI bq y proporciona la opción --continuous=true.

EXPORT DATA OPTIONS ( uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format="CLOUD_SPANNER",
  spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag": "reverse-etl-continuous",
      "change_timestamp_column": "create_time"
   }"""
)
AS SELECT id, account_id, _CHANGE_TIMESTAMP as create_time
  FROM
APPENDS(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
  CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE )

No se garantiza el orden de carga

Los datos de Spanner Graph constan de varias tablas de entrada. Debes seguir un orden de carga estricto cuando las tablas tengan restricciones de integridad referencial. Sin embargo, las consultas continuas simultáneas no pueden controlar el orden en el que Spanner añade filas. Por lo tanto, la carga de datos de Spanner Graph mediante consultas continuas solo se puede hacer en esquemas de gráficos con restricciones de integridad referencial flexibles.

Integración con las cadenas de producción

Las consultas continuas complementan las tareas por lotes programadas. Por ejemplo, usa consultas continuas para obtener actualizaciones casi en tiempo real y tareas programadas para sincronizar o conciliar datos completos.

Usa la consulta continua de BigQuery para crear flujos de procesamiento de ETL inverso actualizados y con capacidad de respuesta para sincronizar datos entre BigQuery y Spanner Graph.

Consideraciones sobre las consultas continuas

  • Coste: las consultas continuas generan costes por la ejecución continua de consultas y el streaming de datos.

  • Gestión de errores: una canalización de consultas continuas se cancela si se produce algún error en la base de datos, como una clave principal duplicada o una infracción de la integridad referencial. Si una canalización falla, debes corregir manualmente los datos de la tabla de BigQuery de origen antes de reiniciar la consulta.

  • Eliminaciones y actualizaciones no gestionadas: la función APPENDS solo captura inserciones. No registra las eliminaciones ni las actualizaciones.

Sigue las prácticas recomendadas de ETL inverso

Para obtener los mejores resultados posibles, haz lo siguiente.

  • Elige una estrategia para evitar errores de integridad referencial al cargar datos de aristas.

  • Diseña tu flujo de datos general para evitar los bordes colgantes. Los bordes colgantes pueden poner en peligro la eficiencia de las consultas de Spanner Graph y la integridad de la estructura del gráfico. Para obtener más información, consulta cómo evitar los bordes colgantes.

  • Sigue las recomendaciones de optimización de exportación de Spanner.

  • Si vas a cargar una gran cantidad de datos, te recomendamos que dividas la canalización en varias canalizaciones más pequeñas para no superar la cuota predeterminada de seis horas de tiempo de ejecución de consultas de BigQuery. Para obtener más información, consulta los límites de los trabajos de consulta de BigQuery.

  • En el caso de las cargas de datos de gran tamaño, añade índices y restricciones de clave externa una vez que se haya completado la carga inicial de datos en bloque. Esta práctica mejora el rendimiento de la carga de datos, ya que las restricciones de clave externa requieren lecturas adicionales para la validación y los índices requieren escrituras adicionales. Estas operaciones aumentan el número de participantes en las transacciones, lo que puede ralentizar el proceso de carga de datos.

  • Habilita el autoescalado en Spanner para acelerar los tiempos de carga de datos en una instancia. A continuación, configura el parámetro priority de Spanner en la sección spanner_options del comando de BigQuery EXPORT DATA como HIGH. Para obtener más información, consulta los artículos Descripción general del escalado automático de Spanner, Configurar exportaciones con la opción spanner_options y RequestOptions.priority.

  • En el caso de las cargas de datos de gran tamaño, crea puntos de división para dividir previamente tu base de datos. De esta forma, Spanner se prepara para un mayor rendimiento.

  • Configura la prioridad de las solicitudes de Spanner para la carga de datos en la definición de la canalización.

Siguientes pasos