Importe, exporte e modifique dados através do Dataflow

Esta página descreve como usar o conetor do Dataflow para importar, exportar e modificar dados em bases de dados do Spanner com o dialeto GoogleSQL e bases de dados com o dialeto PostgreSQL.

O Dataflow é um serviço gerido para transformar e enriquecer dados. O conetor do Dataflow para o Spanner permite-lhe ler dados do Spanner e escrever dados no Spanner num pipeline do Dataflow, transformando ou modificando opcionalmente os dados. Também pode criar pipelines que transferem dados entre o Spanner e outros Google Cloud produtos.

O conetor do Dataflow é o método recomendado para mover dados de forma eficiente para dentro e para fora do Spanner em massa. Também é o método recomendado para fazer transformações grandes numa base de dados que não são suportadas pela DML particionada, como movimentos de tabelas e eliminações em massa que requerem uma JOIN. Quando trabalha com bases de dados individuais, existem outros métodos que pode usar para importar e exportar dados:

  • Use a Google Cloud consola para exportar uma base de dados individual do Spanner para o Cloud Storage no formato Avro.
  • Use a Google Cloud consola para importar uma base de dados novamente para o Spanner a partir de ficheiros que exportou para o Cloud Storage.
  • Use a API REST ou a CLI Google Cloud para executar tarefas de exportação ou importação do Spanner para o Cloud Storage e vice-versa, também usando o formato Avro.

O conector do Dataflow para o Spanner faz parte do SDK Java do Apache Beam e fornece uma API para realizar as ações anteriores. Para mais informações sobre alguns dos conceitos abordados nesta página, como objetos PCollection e transformações, consulte o guia de programação do Apache Beam.

Adicione o conetor ao seu projeto Maven

Para adicionar o conetor do Google Cloud Dataflow a um projeto do Maven, adicione o artefacto do beam-sdks-java-io-google-cloud-platform Maven ao seu ficheiro pom.xml como uma dependência.

Por exemplo, partindo do princípio de que o ficheiro pom.xml define beam.version para o número da versão adequado, adicionaria a seguinte dependência:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Ler dados do Spanner

Para ler a partir do Spanner, aplique a transformação SpannerIO.read. Configure a leitura através dos métodos na classe SpannerIO.Read. A aplicação da transformação devolve um PCollection<Struct>, em que cada elemento na coleção representa uma linha individual devolvida pela operação de leitura. Pode ler a partir do Spanner com e sem uma consulta SQL específica, consoante o resultado necessário.

A aplicação da transformação SpannerIO.read devolve uma vista consistente dos dados através da realização de uma leitura forte. Salvo indicação em contrário, o resultado da leitura é capturado no momento em que iniciou a leitura. Consulte leituras para obter mais informações sobre os diferentes tipos de leituras que o Spanner pode realizar.

Leia dados através de uma consulta

Para ler um conjunto específico de dados do Spanner, configure a transformação através do método SpannerIO.Read.withQuery para especificar uma consulta SQL. Por exemplo:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Ler dados sem especificar uma consulta

Para ler a partir de uma base de dados sem usar uma consulta, pode especificar um nome de tabela usando o método SpannerIO.Read.withTable e especificar uma lista de colunas a ler usando o método SpannerIO.Read.withColumns. Por exemplo:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Para limitar as linhas lidas, pode especificar um conjunto de chaves principais a ler através do método SpannerIO.Read.withKeySet.

Também pode ler uma tabela através de um índice secundário especificado. Tal como acontece com a chamada da API readUsingIndex, o índice tem de conter todos os dados que aparecem nos resultados da consulta.

Para o fazer, especifique a tabela conforme mostrado no exemplo anterior e especifique o índice que contém os valores das colunas necessárias através do método SpannerIO.Read.withIndex. O índice tem de armazenar todas as colunas que a transformação precisa de ler. A chave principal da tabela de base é armazenada implicitamente. Por exemplo, para ler a tabela Songs usando o índice SongsBySongName, usa o seguinte código:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controle a antiguidade dos dados de transações

É garantido que uma transformação é executada num instantâneo de dados consistente. Para controlar a obsolescência dos dados, use o método SpannerIO.Read.withTimestampBound. Consulte as transações para mais informações.

Ler a partir de várias tabelas na mesma transação

Se quiser ler dados de várias tabelas no mesmo momento para garantir a consistência dos dados, execute todas as leituras numa única transação. Para o fazer, aplique uma transformação createTransaction, criando um objeto PCollectionView<Transaction> que, em seguida, cria uma transação. A visualização resultante pode ser transmitida a uma operação de leitura através de SpannerIO.Read.withTransaction.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Ler dados de todas as tabelas disponíveis

Pode ler dados de todas as tabelas disponíveis numa base de dados do Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Resolva problemas de consultas não suportadas

O conetor do Dataflow só suporta consultas SQL do Spanner em que o primeiro operador no plano de execução da consulta é um Distributed Union. Se tentar ler dados do Spanner através de uma consulta e receber uma exceção a indicar que a consulta does not have a DistributedUnion at the root, siga os passos em Compreenda como o Spanner executa consultas para obter um plano de execução para a sua consulta através da consola Google Cloud .

Se a sua consulta SQL não for suportada, simplifique-a para uma consulta que tenha uma união distribuída como o primeiro operador no plano de execução da consulta. Remova funções agregadas, junções de tabelas, bem como os operadores DISTINCT, GROUP BY e ORDER, uma vez que são os operadores com maior probabilidade de impedir o funcionamento da consulta.

Crie mutações para uma gravação

Use o método newInsertOrUpdateBuilder da classe Mutation em vez do método newInsertBuilder, a menos que seja absolutamente necessário para pipelines Java. Para pipelines Python, use SpannerInsertOrUpdate em vez de SpannerInsert. O Dataflow oferece garantias de, pelo menos, uma vez, o que significa que a mutação pode ser escrita várias vezes. Como resultado, INSERT apenas as mutações podem gerar com.google.cloud.spanner.SpannerException: ALREADY_EXISTS erros que fazem com que o pipeline falhe. Para evitar este erro, use a mutação INSERT_OR_UPDATE em alternativa, que adiciona uma nova linha ou atualiza os valores das colunas se a linha já existir. A mutação INSERT_OR_UPDATE pode ser aplicada mais do que uma vez.

Escreva no Spanner e transforme dados

Pode escrever dados no Spanner com o conector do Dataflow usando uma transformação SpannerIO.write para executar uma coleção de mutações de linhas de entrada. O conector do Dataflow agrupa as mutações em lotes para maior eficiência.

O exemplo seguinte mostra como aplicar uma transformação de escrita a um PCollection de mutações:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se uma transformação parar inesperadamente antes da conclusão, as mutações que já foram aplicadas não são revertidas.

Aplique grupos de mutações de forma atómica

Pode usar a classe MutationGroup para garantir que um grupo de mutações é aplicado em conjunto de forma atómica. As mutações num MutationGroup são garantidamente enviadas na mesma transação, mas a transação pode ser repetida.

Os grupos de mutações têm o melhor desempenho quando são usados para agrupar mutações que afetam dados armazenados próximos no espaço de chaves. Uma vez que o Spanner intercala os dados da tabela principal e secundária na tabela principal, esses dados estão sempre próximos no espaço de chaves. Recomendamos que estruture o seu grupo de mutações de forma a que contenha uma mutação que seja aplicada a uma tabela principal e mutações adicionais que sejam aplicadas a tabelas secundárias, ou de forma a que todas as suas mutações modifiquem dados que estejam próximos no espaço de chaves. Para mais informações sobre como o Spanner armazena dados de tabelas principais e secundárias, consulte o artigo Esquema e modelo de dados. Se não organizar os grupos de mutações em torno das hierarquias de tabelas recomendadas ou se os dados acedidos não estiverem próximos no espaço de chaves, o Spanner pode ter de executar commits de duas fases, o que resulta num desempenho mais lento. Para mais informações, consulte o artigo Compromissos de localidade.

Para usar MutationGroup, crie uma transformação SpannerIO.write e chame o método SpannerIO.Write.grouped, que devolve uma transformação que pode aplicar a um PCollection de objetos MutationGroup.

Quando cria um MutationGroup, a primeira mutação listada torna-se a mutação principal. Se o seu grupo de mutações afetar uma tabela principal e uma tabela secundária, a mutação principal deve ser uma mutação na tabela principal. Caso contrário, pode usar qualquer mutação como mutação principal. O conector Dataflow usa a mutação principal para determinar os limites das partições de forma a agrupar as mutações de forma eficiente.

Por exemplo, imagine que a sua aplicação monitoriza o comportamento e sinaliza o comportamento problemático do utilizador para revisão. Para cada comportamento denunciado, quer atualizar a tabela Users para bloquear o acesso do utilizador à sua aplicação e também tem de registar o incidente na tabela PendingReviews. Para garantir que ambas as tabelas são atualizadas de forma atómica, use um MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Quando cria um grupo de mutações, a primeira mutação fornecida como argumento torna-se a mutação principal. Neste caso, as duas tabelas não estão relacionadas, pelo que não existe uma mutação principal clara. Selecionámos userMutation como principal colocando-o em primeiro lugar. A aplicação das duas mutações em separado seria mais rápida, mas não garantiria a atomicidade, pelo que o grupo de mutações é a melhor escolha nesta situação.

O que se segue?