Use a ETL inversa para carregar dados do BigQuery para o Spanner Graph

Este documento descreve como usar pipelines de extração, transformação e carregamento (ETL) inversos para mover e sincronizar continuamente dados de grafos do BigQuery para o Spanner Graph. Abrange os seguintes aspetos principais:

Para usar o ETL inverso para exportar dados do BigQuery para o Spanner, consulte o artigo Exporte dados para o Spanner.

O BigQuery realiza manipulação de dados complexa em grande escala como uma plataforma de tratamento analítico, enquanto o Spanner está otimizado para exemplos de utilização que requerem um QPS elevado e uma latência de publicação baixa. O Spanner Graph e o BigQuery integram-se de forma eficaz para preparar dados de grafos em pipelines de estatísticas do BigQuery, o que permite ao Spanner publicar travessias de grafos de baixa latência.

Antes de começar

  1. Crie uma instância do Spanner com uma base de dados que contenha dados de grafos. Para mais informações, consulte o artigo Configure e consulte o Spanner Graph.

  2. No BigQuery, crie uma reserva de slots do nível Enterprise ou Enterprise Plus. Pode reduzir os custos de computação do BigQuery quando executa exportações para o Spanner Graph. Para o fazer, defina uma capacidade de espaço base de zero e ative o ajuste automático.

  3. Conceda funções de gestão de identidade e de acesso (IAM) que dão aos utilizadores as autorizações necessárias para realizar cada tarefa neste documento.

Funções necessárias

Para obter as autorizações de que precisa para exportar dados de grafos do BigQuery para o Spanner Graph, peça ao seu administrador que lhe conceda as seguintes funções do IAM no seu projeto:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Exemplos de utilização da ETL inversa

Seguem-se exemplos de utilização. Depois de analisar e processar dados no BigQuery, pode movê-los para o Spanner Graph através da ETL inversa.

Agregação e resumo de dados: use o BigQuery para calcular agregações sobre dados detalhados para os tornar mais adequados para exemplos de utilização operacionais.

Transformação e enriquecimento de dados: use o BigQuery para limpar e padronizar os dados recebidos de diferentes origens de dados.

Filtragem e seleção de dados: use o BigQuery para filtrar um conjunto de dados grande para fins de análise. Por exemplo, pode filtrar dados que não são necessários para aplicações em tempo real.

Pré-processamento e engenharia de caraterísticas: no BigQuery, use a função ML.TRANSFORM para transformar dados ou a função ML.FEATURE_CROSS para criar combinações de caraterísticas de caraterísticas de entrada. Em seguida, use a ETL inversa para mover os dados resultantes para o Spanner Graph.

Compreenda o pipeline de ETL inverso

Os dados movem-se do BigQuery para o Spanner Graph num pipeline ETL inverso em dois passos:

  1. O BigQuery usa ranhuras atribuídas à tarefa de pipeline para extrair e transformar dados de origem.

  2. O pipeline de ETL inverso do BigQuery usa APIs do Spanner para carregar dados numa instância do Spanner aprovisionada.

O diagrama seguinte mostra os passos num pipeline de ETL inverso:

Um diagrama que mostra os três passos principais quando os dados se movem do
BigQuery para o Spanner Graph num pipeline de
ETL inverso.

Figura 1. Processo do pipeline de ETL inverso do BigQuery

Faça a gestão das alterações aos dados do gráfico

Pode usar o ETL inverso para fazer o seguinte:

  • Carregue um conjunto de dados de grafos do BigQuery para o Spanner Graph.

  • Sincronize os dados do gráfico do Spanner com as atualizações contínuas de um conjunto de dados no BigQuery.

Configura um pipeline de ETL inverso com uma consulta SQL para especificar os dados de origem e a transformação a aplicar. O pipeline carrega todos os dados que satisfazem a cláusula WHERE da declaração SELECT no Spanner através de uma operação de inserção/atualização. Uma operação de inserção/atualização é equivalente a declarações INSERT OR UPDATE. Insere novas linhas e atualiza as linhas existentes em tabelas que armazenam dados de gráficos. O pipeline baseia as linhas novas e atualizadas numa chave primária da tabela do Spanner.

Insira e atualize dados para tabelas com dependências de ordem de carregamento

As práticas recomendadas de design de esquemas de grafos do Spanner recomendam a utilização de tabelas intercaladas e chaves externas. Se usar tabelas intercaladas ou chaves externas aplicadas, tem de carregar os dados de nós e arestas numa ordem específica. Isto garante que as linhas referenciadas existem antes de criar a linha de referência. Para mais informações, consulte Crie tabelas intercaladas.

O esquema da tabela de entrada do gráfico de exemplo seguinte usa uma tabela intercalada e uma restrição de chave externa para modelar a relação entre uma pessoa e as respetivas contas:

CREATE TABLE Person (
  id    INT64 NOT NULL,
  name  STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE Account (
  id           INT64 NOT NULL,
  create_time  TIMESTAMP,
  is_blocked   BOOL,
  type        STRING(MAX)
) PRIMARY KEY (id);

CREATE TABLE PersonOwnAccount (
  id           INT64 NOT NULL,
  account_id   INT64 NOT NULL,
  create_time  TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id)
) PRIMARY KEY (id, account_id),
  INTERLEAVE IN PARENT Person ON DELETE CASCADE;

CREATE PROPERTY GRAPH FinGraph
  NODE TABLES (
    Person,
    Account
  )
  EDGE TABLES (
    PersonOwnAccount
      SOURCE KEY (id) REFERENCES Person
      DESTINATION KEY (account_id) REFERENCES Account
      LABEL Owns
  );

Neste esquema de exemplo, PersonOwnAccount é uma tabela intercalada em Person. Carregue os elementos na tabela Person antes dos elementos na tabela PersonOwnAccount. Além disso, a restrição de chave externa em PersonOwnAccount garante que existe uma linha correspondente em Account, o destino da relação de aresta. Por conseguinte, carregue a tabela Account antes da tabela PersonOwnAccount. A lista seguinte resume as dependências da ordem de carregamento deste esquema:

Siga estes passos para carregar os dados:

  1. Carregue Person antes de PersonOwnAccount.
  2. Carregue Account antes de PersonOwnAccount.

O Spanner aplica as restrições de integridade referencial no esquema de exemplo. Se o pipeline tentar criar uma linha na tabela PersonOwnAccount sem uma linha correspondente na tabela Person ou na tabela Account, o Spanner devolve um erro. Em seguida, o pipeline falha.

Este exemplo de pipeline ETL inverso usa declarações EXPORTDATA no BigQuery para exportar dados das tabelas Person, Account e PersonOwnAccount num conjunto de dados para cumprir as dependências da ordem de carregamento:

BEGIN
EXPORT DATA OPTIONS (
    uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "Person",
      "priority": "HIGH",
      "tag" : "graph_data_load_person"
    }"""
  ) AS
  SELECT
    id,
    name
  FROM
    DATASET_NAME.Person;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "Account",
    "priority": "HIGH",
    "tag" : "graph_data_load_account"
  }"""
) AS
SELECT
  id,
  create_time,
  is_blocked,
  type
FROM
  DATASET_NAME.Account;

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
  spanner_options="""{
    "table": "PersonOwnAccount",
    "priority": "HIGH",
    "tag" : "graph_data_load_person_own_account"
  }"""
) AS
SELECT
  id,
  account_id,
  create_time
FROM
  DATASET_NAME.PersonOwnAccount;
END;

Sincronize dados

Para sincronizar o BigQuery com o Spanner Graph, use pipelines ETL reversos. Pode configurar um pipeline para fazer uma das seguintes ações:

  • Aplique todas as inserções e atualizações da origem do BigQuery à tabela de destino do Spanner Graph. Pode adicionar elementos de esquema às tabelas de destino para comunicar logicamente as eliminações e remover linhas das tabelas de destino de acordo com uma programação.

  • Use uma função de série temporal que aplique operações de inserção e atualização e identifique operações de eliminação.

Restrições de integridade referencial

Ao contrário do Spanner, o BigQuery não aplica restrições de chaves primárias e estrangeiras. Se os seus dados do BigQuery não estiverem em conformidade com as restrições que cria nas tabelas do Spanner, o pipeline de ETL inverso pode falhar ao carregar esses dados.

A ETL inversa agrupa automaticamente os dados em lotes que não excedem o limite máximo de mutações por confirmação e aplica os lotes atomicamente a uma tabela do Spanner por ordem arbitrária. Se um lote contiver dados que falham numa verificação de integridade referencial, o Spanner não carrega esse lote. Exemplos de tais falhas incluem uma linha secundária intercalada sem uma linha principal ou uma coluna de chave externa aplicada sem um valor correspondente na coluna referenciada. Se um lote falhar uma verificação, o pipeline falha com um erro e para de carregar lotes.

Compreenda os erros de restrição de integridade referencial

Os exemplos seguintes mostram erros de restrição de integridade referencial que pode encontrar:

Resolva erros de restrição de chave externa
  • Erro: "A restrição de chave externa FK_Account foi violada na tabela PersonOwnAccount. Não é possível encontrar valores referenciados em Account(id)"

  • Causa: falha na inserção de uma linha na tabela PersonOwnAccount porque falta uma linha correspondente na tabela Account, que a chave externa FK_Account requer.

Resolva os erros de falta de linhas principais
  • Erro: "A linha principal da linha [15,1] na tabela PersonOwnAccount está em falta"

  • Causa: a inserção de uma linha em PersonOwnAccount (id: 15 e account_id: 1) falhou porque falta uma linha principal na tabela Person (id: 15).

Para reduzir o risco de erros de integridade referencial, considere as seguintes opções. Cada opção tem vantagens e desvantagens.

  • Relaxe as restrições para permitir que o Spanner Graph carregue dados.
  • Adicione lógica ao seu pipeline para omitir linhas que violem as restrições de integridade referencial.

Relaxe a integridade referencial

Uma opção para evitar erros de integridade referencial ao carregar dados é relaxar as restrições para que o Spanner não aplique a integridade referencial.

  • Pode criar tabelas intercaladas com a cláusula INTERLEAVE IN para usar as mesmas características de intercalação de linhas físicas. Se usar INTERLEAVE IN em vez de INTERLEAVE IN PARENT, o Spanner não aplica a integridade referencial, embora as consultas beneficiem da colocação conjunta de tabelas relacionadas.

  • Pode criar chaves externas informativas usando a opção NOT ENFORCED. A opção NOT ENFORCED oferece vantagens de otimização de consultas. No entanto, o Spanner não aplica a integridade referencial.

Por exemplo, para criar a tabela de entrada de arestas sem verificações de integridade referencial, pode usar este DDL:

CREATE TABLE PersonOwnAccount (
  id          INT64 NOT NULL,
  account_id  INT64 NOT NULL,
  create_time TIMESTAMP,
  CONSTRAINT FK_Account FOREIGN KEY (account_id) REFERENCES Account (id) NOT ENFORCED
) PRIMARY KEY (id, account_id),
INTERLEAVE IN Person;

Respeite a integridade referencial nos pipelines de ETL inversa

Para garantir que o pipeline carrega apenas linhas que satisfazem as verificações de integridade referencial, inclua apenas PersonOwnAccount linhas que tenham linhas correspondentes nas tabelas Person e Account. Em seguida, preserve a ordem de carregamento, para que o Spanner carregue as linhas Person e Account antes das linhas PersonOwnAccount que se referem a elas.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_load_person_own_account"
    }"""
  ) AS
  SELECT
    poa.id,
    poa.account_id,
    poa.create_time
  FROM `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
    JOIN `PROJECT_ID.DATASET_NAME.Person` p ON (poa.id = p.id)
    JOIN `PROJECT_ID.DATASET_NAME.Account` a ON (poa.account_id = a.id)
  WHERE poa.id = p.id
    AND poa.account_id = a.id;

Elimine elementos do gráfico

Os pipelines de ETL inverso usam operações de inserção/atualização. Uma vez que as operações de inserção/atualização são equivalentes a declarações INSERT OR UPDATE, um pipeline só pode sincronizar linhas que existam nos dados de origem no momento da execução. Isto significa que o pipeline exclui as linhas eliminadas. Se eliminar dados do BigQuery, um pipeline de ETL inverso não pode remover diretamente os mesmos dados do Spanner Graph.

Pode usar uma das seguintes opções para processar eliminações de tabelas de origem do BigQuery:

Faça uma eliminação lógica ou temporária na origem

Para marcar logicamente linhas para eliminação, use uma flag eliminada no BigQuery. Em seguida, crie uma coluna na tabela do Spanner para a qual possa propagar a flag. Quando o ETL inverso aplica as atualizações da pipeline, elimina as linhas que têm esta flag no Spanner. Pode encontrar e eliminar essas linhas explicitamente através de DML particionada. Em alternativa, elimine linhas implicitamente configurando uma coluna TTL (tempo de vida) com uma data que dependa da coluna de indicador de eliminação. Escreva consultas Spanner para excluir estas linhas eliminadas logicamente. Isto garante que o Spanner exclui estas linhas dos resultados antes da eliminação agendada. Depois de a pipeline de ETL inversa ser executada até à conclusão, o Spanner reflete as eliminações lógicas nas respetivas linhas. Em seguida, pode eliminar linhas do BigQuery.

Este exemplo adiciona uma coluna is_deleted à tabela PersonOwnAccount no Spanner. Em seguida, adiciona uma coluna expired_ts_generated que depende do valor is_deleted. A política de TTL agenda a eliminação das linhas afetadas porque a data na coluna gerada é anterior ao limite de DELETION POLICY.

ALTER TABLE PersonOwnAccount
  ADD COLUMN is_deleted BOOL DEFAULT (FALSE);

ALTER TABLE PersonOwnAccount ADD COLUMN
  expired_ts_generated TIMESTAMP AS (IF(is_deleted,
    TIMESTAMP("1970-01-01 00:00:00+00"),
    TIMESTAMP("9999-01-01 00:00:00+00"))) STORED HIDDEN;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts_generated, INTERVAL 0 DAY));

Use o histórico de alterações do BigQuery para inserções, atualizações e eliminações lógicas

Pode acompanhar as alterações a uma tabela do BigQuery através do respetivo histórico de alterações. Use a função GoogleSQL CHANGES para encontrar linhas que foram alteradas num intervalo de tempo específico. Em seguida, use as informações da linha eliminada com um pipeline de ETL inverso. Pode configurar o pipeline para definir um indicador, como uma flag de eliminação ou uma data de validade, na tabela do Spanner. Este indicador marca as linhas para eliminação nas tabelas do Spanner.

Use os resultados da função de série cronológica CHANGES para decidir que linhas da tabela de origem incluir no carregamento do pipeline de ETL inverso.

O pipeline inclui linhas com _CHANGE_TYPE como INSERT ou UPDATE como inserções/atualizações se a linha existir na tabela de origem. A linha atual da tabela de origem fornece os dados mais recentes.

Use linhas com _CHANGE_TYPE como DELETE que não têm linhas existentes na tabela de origem para definir um indicador na tabela do Spanner, como um indicador de eliminação ou uma data de validade da linha.

A sua consulta de exportação tem de ter em conta a ordem das inserções e eliminações no BigQuery. Por exemplo, considere uma linha eliminada no momento T1 e uma nova linha inserida num momento posterior T2. Se ambos forem mapeados para a mesma linha da tabela do Spanner, a exportação tem de preservar os efeitos destes eventos na respetiva ordem original.

Se estiver definido, o indicador delete marca as linhas para eliminação nas tabelas do Spanner.

Por exemplo, pode adicionar uma coluna a uma tabela de entrada do Spanner para armazenar a data de validade de cada linha. Em seguida, crie uma política de eliminação que use estas datas de validade.

O exemplo seguinte mostra como adicionar uma coluna para armazenar as datas de validade das linhas da tabela.

ALTER TABLE PersonOwnAccount ADD COLUMN expired_ts TIMESTAMP;

ALTER TABLE PersonOwnAccount
  ADD ROW DELETION POLICY (OLDER_THAN(expired_ts, INTERVAL 1 DAY));

Para usar a função CHANGES numa tabela no BigQuery, defina a opção enable_change_history da tabela como TRUE:

ALTER TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`
  SET OPTIONS (enable_change_history=TRUE);

O exemplo seguinte mostra como pode usar a ETL inversa para atualizar linhas novas ou alteradas e definir a data de validade das linhas marcadas para eliminação. Uma junção à esquerda com a tabela PersonOwnAccount dá à consulta informações sobre o estado atual de cada linha.

EXPORT DATA OPTIONS (
  uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
    format='CLOUD_SPANNER',
    spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag" : "graph_data_delete_via_reverse_etl"
    }"""
  ) AS
SELECT
  DISTINCT
   IF (changes._CHANGE_TYPE = 'DELETE', changes.id, poa.id) AS id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.account_id, poa.account_id) AS account_id,
   IF (changes._CHANGE_TYPE = 'DELETE', changes.create_time, poa.create_time) AS create_time,
   IF (changes._CHANGE_TYPE = 'DELETE', changes._CHANGE_TIMESTAMP, NULL) AS expired_ts
FROM
  CHANGES(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
    TIMESTAMP_TRUNC(TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY), DAY),
    TIMESTAMP_TRUNC(CURRENT_TIMESTAMP(), DAY)) changes
LEFT JOIN `PROJECT_ID.DATASET_NAME.PersonOwnAccount` poa
  ON (poa.id = changes.id
  AND poa.account_id = changes.account_id)
WHERE (changes._CHANGE_TYPE = 'DELETE'
   AND poa.id IS NULL)
   OR (changes._CHANGE_TYPE IN ( 'UPDATE', 'INSERT')
   AND poa.id IS NOT NULL );

A consulta de exemplo usa um LEFT JOIN com a tabela de origem para preservar a ordem. Esta junção garante que os registos de alterações DELETE são ignorados para as linhas eliminadas e, em seguida, recriados no intervalo do histórico de alterações da consulta. O pipeline preserva a linha nova e válida.

Quando elimina linhas, o pipeline preenche a coluna expired_ts na linha do gráfico do Spanner correspondente com a data/hora DELETE da coluna _CHANGE_TIMESTAMP. Uma política de eliminação de linhas (política de TTL) no Spanner elimina qualquer linha em que o valor de expired_ts seja superior a um dia no passado.

Para garantir a fiabilidade do sistema, coordene o horário do pipeline, o período de análise e a política de TTL do Spanner. Agende a execução do pipeline diariamente. A política de TTL do Spanner tem de ter uma duração superior a este intervalo de execução. Isto impede que o pipeline volte a processar um evento DELETE anterior para uma linha já removida pela política de TTL do Spanner.

Este exemplo mostra o intervalo start_timestamp e end_timestamp para consultas diárias que captam todas as alterações da tabela do BigQuery do dia UTC anterior. Como se trata de uma consulta em lote e a função CHANGES tem limitações, a data/hora end_timestamp tem de ser, pelo menos, 10 minutos antes da hora atual. Por conseguinte, agende a execução desta consulta, pelo menos, 10 minutos após a meia-noite UTC. Para mais detalhes, consulte a documentação do CHANGES.

Use colunas TTL com a data/hora da última visualização

Um pipeline de ETL inverso define uma coluna last_seen_ts com a data/hora atual para cada linha na tabela do Spanner. Quando elimina linhas do BigQuery, o Spanner não atualiza as linhas correspondentes, e a coluna last_seen_ts não é alterada. Em seguida, o Spanner remove as linhas com um last_seen_ts desatualizado através de uma política de TTL ou de DML particionada, com base num limite definido. Antes da eliminação agendada, as consultas do Spanner podem filtrar linhas com um valor de last_seen_ts mais antigo do que este limite. Esta abordagem funciona eficazmente quando os dados do gráfico são atualizados rotineiramente e as atualizações em falta indicam dados desatualizados para eliminação.

Faça uma atualização completa

Antes de carregar a partir do BigQuery, pode eliminar tabelas do Spanner para refletir as eliminações nas tabelas de origem. Isto impede que o pipeline carregue linhas eliminadas das tabelas do BigQuery de origem no Spanner durante a próxima execução do pipeline. Esta pode ser a opção mais fácil de implementar. No entanto, considere o tempo necessário para recarregar totalmente os dados do gráfico.

Mantenha um pipeline de ETL inverso em lote agendado

Após a execução inicial do pipeline de ETL inverso, os dados são carregados em massa do BigQuery para o Spanner Graph. Os dados do mundo real continuam a mudar. Os conjuntos de dados mudam e o pipeline adiciona ou remove elementos do gráfico ao longo do tempo. O pipeline descobre novos nós e adiciona novas relações de arestas, ou a inferência de IA gera-as.

Para garantir que a base de dados de grafos do Spanner permanece atualizada, agende e sequencie a orquestração do pipeline do BigQuery através de uma das seguintes opções:

As pipelines do BigQuery permitem-lhe desenvolver, testar, controlar versões e implementar fluxos de trabalho de transformação de dados SQL complexos no BigQuery. Processa nativamente as dependências de ordens, permitindo-lhe definir relações entre as consultas no seu pipeline. O Dataform cria uma árvore de dependências e executa as suas consultas pela ordem correta. Isto garante que as dependências a montante são concluídas antes de as tarefas a jusante serem iniciadas.

Os fluxos de trabalho invocados pelo Cloud Scheduler oferecem uma solução útil e flexível para orquestrar sequências de Google Cloud serviços, incluindo consultas do BigQuery. Defina um fluxo de trabalho como uma série de passos que executam cada um um trabalho do BigQuery. Pode usar o Cloud Scheduler para invocar estes fluxos de trabalho num horário definido. Gerir dependências através da definição do fluxo de trabalho para especificar a ordem de execução, implementar lógica condicional, processar erros e transmitir resultados de uma consulta para outra.

As consultas agendadas, também conhecidas como tarefas de transferência do BigQuery, no BigQuery permitem-lhe executar declarações SQL de forma recorrente. As consultas agendadas não oferecem um processamento de erros robusto nem uma gestão de dependências dinâmicas.

ETL inverso com consultas contínuas do BigQuery

A funcionalidade de consultas contínuas do BigQuery permite-lhe executar operações do BigQuery praticamente em tempo real. A combinação de EXPORT DATA com consultas contínuas oferece um método alternativo para executar pipelines de ETL inverso que evita tarefas em lote agendadas.

Uma consulta contínua é uma consulta de execução prolongada que monitoriza uma tabela do BigQuery de origem para novas linhas. Quando o BigQuery deteta novas linhas anexadas à tabela, transmite os resultados da consulta para a operação EXPORT DATA.

Esta abordagem oferece as seguintes vantagens.

  • Sincronização de dados quase em tempo real: as novas linhas no BigQuery refletem-se no Spanner com um atraso mínimo.

  • Sobrecarga de processamento em lote reduzida: uma consulta contínua elimina a necessidade de tarefas em lote periódicas, o que reduz a sobrecarga computacional.

  • Atualizações orientadas por eventos: atualizações de dados do Spanner em resposta a alterações reais no BigQuery.

Um pipeline de consulta contínua requer uma atribuição de reserva de horário com o job_type de CONTINUOUS. Atribua esta função ao nível do projeto ou da pasta ou ao nível da organização.

Crie uma consulta contínua com ETL inverso do BigQuery para o Spanner

Configure o parâmetro start_timestamp da função APPENDS para iniciar o processamento de dados no ponto em que o carregamento em lote foi interrompido. Esta função capta todas as linhas criadas no intervalo de tempo específico. No exemplo seguinte, o pipeline define arbitrariamente o ponto de partida para 10 minutos antes do CURRENT_TIME. Esta data/hora tem de estar dentro do período de viagem no tempo do BigQuery.

Existem vários métodos para iniciar um pipeline de consultas contínuas, incluindo os seguintes:

  1. No BigQuery Studio, selecionando Mais e escolhendo Consulta contínua em Escolher modo de consulta.

  2. Use a CLI bq e forneça a opção --continuous=true.

EXPORT DATA OPTIONS ( uri="https://spanner.googleapis.com/projects/PROJECT_ID/instances/INSTANCE_ID/databases/DATABASE_ID",
  format="CLOUD_SPANNER",
  spanner_options="""{
      "table": "PersonOwnAccount",
      "priority": "HIGH",
      "tag": "reverse-etl-continuous",
      "change_timestamp_column": "create_time"
   }"""
)
AS SELECT id, account_id, _CHANGE_TIMESTAMP as create_time
  FROM
APPENDS(TABLE `PROJECT_ID.DATASET_NAME.PersonOwnAccount`,
  CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE )

A ordem de carregamento não é garantida

Os dados do gráfico do Spanner consistem em várias tabelas de entrada. Tem de cumprir uma ordem de carregamento rigorosa quando as tabelas têm restrições de integridade referencial. No entanto, as consultas contínuas concorrentes não podem controlar a ordem em que o Spanner adiciona linhas. Como resultado, o carregamento de dados do Spanner Graph através de consultas contínuas destina-se apenas a esquemas de grafos com restrições de integridade referencial flexíveis.

Integração com pipelines existentes

A consulta contínua complementa as tarefas em lote agendadas existentes. Por exemplo, use a consulta contínua para atualizações quase em tempo real e tarefas agendadas para sincronização ou conciliação de dados completa.

Use a consulta contínua do BigQuery para criar pipelines de ETL inverso responsivos e atualizados para sincronizar dados entre o BigQuery e o Spanner Graph.

Considerações sobre consultas contínuas

  • Custo: as consultas contínuas incorrem em custos pela execução contínua de consultas e pela transmissão de dados.

  • Processamento de erros: um pipeline de consulta contínua é cancelado se encontrar erros de base de dados, como uma chave principal duplicada ou uma violação da integridade referencial. Se um pipeline falhar, tem de corrigir manualmente os dados na tabela de origem do BigQuery antes de reiniciar a consulta.

  • Eliminações e atualizações não processadas: a função APPENDS apenas captura inserções. Não captura eliminações nem atualizações.

Siga as práticas recomendadas de ETL inverso

Para obter os melhores resultados, faça o seguinte.

  • Escolha uma estratégia para evitar erros de integridade referencial quando carregar dados de arestas.

  • Conceba o seu pipeline de dados geral para evitar arestas pendentes. As arestas pendentes podem comprometer a eficiência das consultas do gráfico do Spanner e a integridade da estrutura do gráfico. Para mais informações, consulte o artigo sobre como evitar arestas pendentes.

  • Siga as recomendações de otimização da exportação do Spanner.

  • Se estiver a carregar uma grande quantidade de dados, considere dividir o pipeline em vários pipelines mais pequenos para evitar atingir a quota de tempo de execução de consultas do BigQuery predefinida de seis horas. Para mais informações, consulte os limites de tarefas de consulta do BigQuery.

  • Para carregamentos de dados grandes, adicione índices e restrições de chave externa após a conclusão do carregamento de dados em massa inicial. Esta prática melhora o desempenho do carregamento de dados porque as restrições de chaves externas requerem leituras adicionais para validação e os índices requerem escritas adicionais. Estas operações aumentam o número de participantes da transação, o que pode abrandar o processo de carregamento de dados.

  • Ative o escalamento automático no Spanner para acelerar os tempos de carregamento de dados numa instância. Em seguida, configure o parâmetro do Spanner na secção spanner_options do comando EXPORT DATA do BigQuery para HIGH.priority Para mais informações, consulte Vista geral do ajuste de escala automático do Spanner, Configure exportações com a opção spanner_options e RequestOptions.priority.

  • Para carregamentos de dados grandes, crie pontos de divisão para dividir previamente a sua base de dados. Isto prepara o Spanner para um débito aumentado.

  • Configure a prioridade do pedido do Spanner para o carregamento de dados na definição do pipeline.

O que se segue?