Esta página descreve como usar o conetor do Dataflow para importar, exportar e modificar dados em bases de dados do Spanner com o dialeto GoogleSQL e bases de dados com o dialeto PostgreSQL.
O Dataflow é um serviço gerido para transformar e enriquecer dados. O conetor do Dataflow para o Spanner permite-lhe ler dados do Spanner e escrever dados no Spanner num pipeline do Dataflow, transformando ou modificando opcionalmente os dados. Também pode criar pipelines que transferem dados entre o Spanner e outros Google Cloud produtos.
O conetor do Dataflow é o método recomendado para mover dados de forma eficiente para dentro e para fora do Spanner em massa. Também é o método recomendado para fazer transformações grandes numa base de dados que não são suportadas pela DML particionada, como movimentos de tabelas e eliminações em massa que requerem uma JOIN. Quando trabalha com bases de dados individuais, existem outros métodos que pode usar para importar e exportar dados:
- Use a Google Cloud consola para exportar uma base de dados individual do Spanner para o Cloud Storage no formato Avro.
- Use a Google Cloud consola para importar uma base de dados novamente para o Spanner a partir de ficheiros que exportou para o Cloud Storage.
- Use a API REST ou a CLI Google Cloud para executar tarefas de exportação ou importação do Spanner para o Cloud Storage e vice-versa, também usando o formato Avro.
O conector do Dataflow para o Spanner faz parte do SDK Java do Apache Beam e fornece uma API para realizar as ações anteriores. Para mais informações sobre alguns dos conceitos abordados nesta página, como objetos PCollection
e transformações, consulte o guia de programação do Apache Beam.
Adicione o conetor ao seu projeto Maven
Para adicionar o conetor do Google Cloud Dataflow a um projeto do Maven, adicione o artefacto do beam-sdks-java-io-google-cloud-platform
Maven ao seu ficheiro pom.xml
como uma dependência.
Por exemplo, partindo do princípio de que o ficheiro pom.xml
define beam.version
para o número da versão adequado, adicionaria a seguinte dependência:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Ler dados do Spanner
Para ler a partir do Spanner, aplique a transformação SpannerIO.read
. Configure a leitura através dos métodos na classe SpannerIO.Read
. A aplicação da transformação devolve um PCollection<Struct>
, em que cada elemento na coleção representa uma linha individual devolvida pela operação de leitura. Pode ler a partir do Spanner com e sem uma consulta SQL específica, consoante o resultado necessário.
A aplicação da transformação SpannerIO.read
devolve uma vista consistente dos dados através da realização de uma leitura forte. Salvo indicação em contrário, o resultado da leitura é capturado no momento em que iniciou a leitura. Consulte leituras para obter mais informações sobre os diferentes tipos de leituras que o Spanner pode realizar.
Leia dados através de uma consulta
Para ler um conjunto específico de dados do Spanner, configure a transformação através do método SpannerIO.Read.withQuery
para especificar uma consulta SQL. Por exemplo:
Ler dados sem especificar uma consulta
Para ler a partir de uma base de dados sem usar uma consulta, pode especificar um nome de tabela usando o método SpannerIO.Read.withTable
e especificar uma lista de colunas a ler usando o método SpannerIO.Read.withColumns
. Por exemplo:
GoogleSQL
PostgreSQL
Para limitar as linhas lidas, pode especificar um conjunto de chaves principais a ler através do método
SpannerIO.Read.withKeySet
.
Também pode ler uma tabela através de um índice secundário especificado. Tal como acontece com a chamada da API
readUsingIndex
,
o índice tem de conter todos os dados que aparecem nos resultados da consulta.
Para o fazer, especifique a tabela conforme mostrado no exemplo anterior e especifique o índice que contém os valores das colunas necessárias através do método SpannerIO.Read.withIndex
. O índice tem de armazenar todas as colunas que a transformação precisa de ler. A chave principal da tabela de base é armazenada implicitamente. Por exemplo, para ler a tabela Songs
usando o índice
SongsBySongName
, usa o seguinte código:
GoogleSQL
PostgreSQL
Controle a antiguidade dos dados de transações
É garantido que uma transformação é executada num instantâneo de dados consistente. Para controlar a obsolescência dos dados, use o método SpannerIO.Read.withTimestampBound
. Consulte as
transações para mais informações.
Ler a partir de várias tabelas na mesma transação
Se quiser ler dados de várias tabelas no mesmo momento para garantir a consistência dos dados, execute todas as leituras numa única transação. Para o fazer, aplique uma transformação createTransaction
, criando um objeto PCollectionView<Transaction>
que, em seguida, cria uma transação. A visualização resultante pode ser transmitida a uma operação de leitura através de SpannerIO.Read.withTransaction
.
GoogleSQL
PostgreSQL
Ler dados de todas as tabelas disponíveis
Pode ler dados de todas as tabelas disponíveis numa base de dados do Spanner.
GoogleSQL
PostgreSQL
Resolva problemas de consultas não suportadas
O conetor do Dataflow só suporta consultas SQL do Spanner em que o primeiro operador no plano de execução da consulta é um Distributed
Union. Se tentar ler dados do Spanner através de uma consulta e receber uma exceção a indicar que a consulta does not have a
DistributedUnion at the root
, siga os passos em Compreenda como o Spanner executa consultas para obter um plano de execução para a sua consulta através da consola Google Cloud .
Se a sua consulta SQL não for suportada, simplifique-a para uma consulta que tenha uma união distribuída como o primeiro operador no plano de execução da consulta. Remova funções agregadas, junções de tabelas, bem como os operadores DISTINCT
, GROUP BY
e ORDER
, uma vez que são os operadores com maior probabilidade de impedir o funcionamento da consulta.
Crie mutações para uma gravação
Use o método newInsertOrUpdateBuilder
da classe Mutation
em vez do método newInsertBuilder
, a menos que seja absolutamente necessário para pipelines Java. Para pipelines Python, use
SpannerInsertOrUpdate
em vez de
SpannerInsert
. O Dataflow oferece garantias de, pelo menos, uma vez, o que significa que a mutação pode ser escrita várias vezes. Como resultado, INSERT
apenas as mutações podem gerar
com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
erros que fazem com que
o pipeline falhe. Para evitar este erro, use a mutação INSERT_OR_UPDATE
em alternativa, que adiciona uma nova linha ou atualiza os valores das colunas se a linha já existir. A mutação INSERT_OR_UPDATE
pode ser aplicada mais do que uma vez.
Escreva no Spanner e transforme dados
Pode escrever dados no Spanner com o conector do Dataflow usando uma transformação SpannerIO.write
para executar uma coleção de mutações de linhas de entrada. O conector do Dataflow agrupa as mutações em lotes para maior eficiência.
O exemplo seguinte mostra como aplicar uma transformação de escrita a um PCollection
de mutações:
GoogleSQL
PostgreSQL
Se uma transformação parar inesperadamente antes da conclusão, as mutações que já foram aplicadas não são revertidas.
Aplique grupos de mutações de forma atómica
Pode usar a classe MutationGroup
para garantir que um grupo de mutações é aplicado em conjunto de forma atómica. As mutações num
MutationGroup
são garantidamente enviadas na mesma transação, mas a
transação pode ser repetida.
Os grupos de mutações têm o melhor desempenho quando são usados para agrupar mutações que afetam dados armazenados próximos no espaço de chaves. Uma vez que o Spanner intercala os dados da tabela principal e secundária na tabela principal, esses dados estão sempre próximos no espaço de chaves. Recomendamos que estruture o seu grupo de mutações de forma a que contenha uma mutação que seja aplicada a uma tabela principal e mutações adicionais que sejam aplicadas a tabelas secundárias, ou de forma a que todas as suas mutações modifiquem dados que estejam próximos no espaço de chaves. Para mais informações sobre como o Spanner armazena dados de tabelas principais e secundárias, consulte o artigo Esquema e modelo de dados. Se não organizar os grupos de mutações em torno das hierarquias de tabelas recomendadas ou se os dados acedidos não estiverem próximos no espaço de chaves, o Spanner pode ter de executar commits de duas fases, o que resulta num desempenho mais lento. Para mais informações, consulte o artigo Compromissos de localidade.
Para usar MutationGroup
, crie uma transformação SpannerIO.write
e chame o método SpannerIO.Write.grouped
, que devolve uma transformação que pode aplicar a um PCollection
de objetos MutationGroup
.
Quando cria um MutationGroup
, a primeira mutação listada torna-se a mutação principal. Se o seu grupo de mutações afetar uma tabela principal e uma tabela secundária, a mutação principal deve ser uma mutação na tabela principal. Caso contrário,
pode usar qualquer mutação como mutação principal. O conector Dataflow usa a mutação principal para determinar os limites das partições de forma a agrupar as mutações de forma eficiente.
Por exemplo, imagine que a sua aplicação monitoriza o comportamento e sinaliza o comportamento problemático do utilizador para revisão. Para cada comportamento denunciado, quer atualizar a tabela Users
para bloquear o acesso do utilizador à sua aplicação e também tem de registar o incidente na tabela PendingReviews
. Para garantir que ambas as tabelas são atualizadas de forma atómica, use um MutationGroup
:
GoogleSQL
PostgreSQL
Quando cria um grupo de mutações, a primeira mutação fornecida como argumento torna-se a mutação principal. Neste caso, as duas tabelas não estão relacionadas, pelo que não existe uma mutação principal clara. Selecionámos userMutation
como principal colocando-o em primeiro lugar. A aplicação das duas mutações em separado seria mais rápida, mas não garantiria a atomicidade, pelo que o grupo de mutações é a melhor escolha nesta situação.
O que se segue?
- Saiba mais sobre como conceber um pipeline de dados do Apache Beam.
- Exporte e importe bases de dados do Spanner na Google Cloud consola através do Dataflow.