Importare, esportare e modificare i dati utilizzando Dataflow

Questa pagina descrive come utilizzare il connettore Dataflow per Spanner per importare, esportare e modificare i dati nei database di dialetti GoogleSQL e PostgreSQL di Spanner.

Dataflow è un servizio gestito per la trasformazione e l'arricchimento dei dati. Il connettore Dataflow per Spanner consente di leggere e scrivere dati in Spanner in una pipeline Dataflow, trasformando o modificando i dati, se necessario. Puoi anche creare pipeline che trasferiscono dati tra Spanner e altri Google Cloud prodotti.

Il connettore Dataflow è il metodo consigliato per spostare in modo efficiente i dati in blocco in e da Spanner. È anche il metodo consigliato per eseguire trasformazioni di grandi dimensioni su un database non supportate da DML partizionato, come spostamenti di tabelle ed eliminazioni collettive che richiedono un'operazione JOIN. Quando lavori con singoli database, puoi utilizzare altri metodi per importare ed esportare i dati:

  • Utilizzala Google Cloud console per esportare un singolo database da Spanner a Cloud Storage in Avro.
  • Utilizzala Google Cloud console per importare di nuovo un database in Spanner dai file esportati in Cloud Storage.
  • Utilizza l'API REST o Google Cloud CLI per eseguire job di esportazione o importazione da Spanner a Cloud Storage e viceversa, utilizzando anche il formato Avro.

Il connettore Dataflow per Spanner fa parte dell' SDK Apache Beam per Java e fornisce un'API per eseguire le azioni precedenti. Per ulteriori informazioni su alcuni dei concetti trattati in questa pagina, come gli oggetti PCollection e le trasformazioni, consulta la guida alla programmazione di Apache Beam.

Aggiungere il connettore al progetto Maven

Per aggiungere il Google Cloud connettore Dataflow a un progetto Maven, aggiungi l'artefatto Maven beam-sdks-java-io-google-cloud-platform al file pom.xml come dipendenza.

Ad esempio, supponendo che il file pom.xml imposti beam.version sul numero di versione appropriato, devi aggiungere la seguente dipendenza:

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>${beam.version}</version>
</dependency>

Leggere i dati da Spanner

Per leggere da Spanner, applica la SpannerIO.read trasformazione. Configura la lettura utilizzando i metodi della SpannerIO.Read classe. L'applicazione della trasformazione restituisce un PCollection<Struct>, in cui ogni elemento della raccolta rappresenta una singola riga restituita dall'operazione di lettura. Puoi leggere da Spanner con e senza una query SQL specifica, a seconda dell'output necessario.

L'applicazione della trasformazione SpannerIO.read restituisce una visualizzazione coerente dei dati eseguendo una lettura forte. Se non diversamente specificato, il risultato della lettura viene acquisito tramite snapshot al momento dell'avvio della lettura. Per ulteriori informazioni sui diversi tipi di letture che Spanner può eseguire, consulta la sezione Letture.

Leggere i dati utilizzando una query

Per leggere un insieme specifico di dati da Spanner, configura la trasformazione utilizzando il SpannerIO.Read.withQuery metodo per specificare una query SQL. Ad esempio:

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withQuery("SELECT * FROM " + options.getTable()));

Leggere i dati senza specificare una query

Per leggere da un database senza utilizzare una query, puoi specificare un nome di tabella utilizzando il metodo SpannerIO.Read.withTable e specificare un elenco di colonne da leggere utilizzando il metodo SpannerIO.Read.withColumns. Ad esempio:

GoogleSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("Singers")
        .withColumns("singerId", "firstName", "lastName"));

PostgreSQL

// Query for all the columns and rows in the specified Spanner table
PCollection<Struct> records = pipeline.apply(
    SpannerIO.read()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withTable("singers")
        .withColumns("singer_id", "first_name", "last_name"));

Per limitare le righe lette, puoi specificare un insieme di chiavi primarie da leggere utilizzando il SpannerIO.Read.withKeySet metodo.

Puoi anche leggere una tabella utilizzando un indice secondario specificato. Come per la chiamata API readUsingIndex, l'indice deve contenere tutti i dati visualizzati nei risultati della query.

Per farlo, specifica la tabella come mostrato nell'esempio precedente e specifica l' indice che contiene i valori delle colonne necessari utilizzando il SpannerIO.Read.withIndex metodo. L'indice deve memorizzare tutte le colonne che la trasformazione deve leggere. La chiave primaria della tabella di base viene memorizzata implicitamente. Ad esempio, per leggere la tabella Songs utilizzando l'indice SongsBySongName, utilizza il seguente codice:

GoogleSQL

// Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the Songs table,
            .withColumns("SingerId", "AlbumId", "TrackId", "SongName"));

PostgreSQL

// // Read the indexed columns from all rows in the specified index.
PCollection<Struct> records =
    pipeline.apply(
        SpannerIO.read()
            .withInstanceId(instanceId)
            .withDatabaseId(databaseId)
            .withTable("Songs")
            .withIndex("SongsBySongName")
            // Can only read columns that are either indexed, STORED in the index or
            // part of the primary key of the songs table,
            .withColumns("singer_id", "album_id", "track_id", "song_name"));

Controllare la vetustà dei dati delle transazioni

È garantito che una trasformazione venga eseguita su uno snapshot coerente dei dati. Per controllare la vetustà dei dati, utilizza il SpannerIO.Read.withTimestampBound metodo. Per ulteriori informazioni, consulta la sezione Transazioni.

Leggere da più tabelle nella stessa transazione

Se vuoi leggere i dati da più tabelle nello stesso momento per garantire la coerenza dei dati, esegui tutte le letture in un'unica transazione. Per farlo, applica una createTransaction trasformazione, creando un oggetto PCollectionView<Transaction> che crea una transazione. La visualizzazione risultante può essere passata a un'operazione di lettura utilizzando SpannerIO.Read.withTransaction.

GoogleSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerID, FirstName, LastName FROM Singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT SingerId, AlbumId, AlbumTitle FROM Albums")
            .withTransaction(tx));

PostgreSQL

SpannerConfig spannerConfig =
    SpannerConfig.create().withInstanceId(instanceId).withDatabaseId(databaseId);
PCollectionView<Transaction> tx =
    pipeline.apply(
        SpannerIO.createTransaction()
            .withSpannerConfig(spannerConfig)
            .withTimestampBound(TimestampBound.strong()));
PCollection<Struct> singers =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, first_name, last_name FROM singers")
            .withTransaction(tx));
PCollection<Struct> albums =
    pipeline.apply(
        SpannerIO.read()
            .withSpannerConfig(spannerConfig)
            .withQuery("SELECT singer_id, album_id, album_title FROM albums")
            .withTransaction(tx));

Leggere i dati da tutte le tabelle disponibili

Puoi leggere i dati da tutte le tabelle disponibili in un database Spanner.

GoogleSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    "SELECT t.table_name FROM information_schema.tables AS t WHERE t"
                        + ".table_catalog = '' AND t.table_schema = ''"))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create().withQuery("SELECT * FROM " + tableName);
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

PostgreSQL

PCollection<Struct> allRecords =
    pipeline
        .apply(
            SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withBatching(false)
                .withQuery(
                    Statement.newBuilder(
                            "SELECT t.table_name FROM information_schema.tables AS t "
                                + "WHERE t.table_catalog = $1 AND t.table_schema = $2")
                        .bind("p1")
                        .to(spannerConfig.getDatabaseId().get())
                        .bind("p2")
                        .to("public")
                        .build()))
        .apply(
            MapElements.into(TypeDescriptor.of(ReadOperation.class))
                .via(
                    (SerializableFunction<Struct, ReadOperation>)
                        input -> {
                          String tableName = input.getString(0);
                          return ReadOperation.create()
                              .withQuery("SELECT * FROM \"" + tableName + "\"");
                        }))
        .apply(SpannerIO.readAll().withSpannerConfig(spannerConfig));

Risolvere i problemi relativi alle domande non supportate

Il connettore Dataflow supporta solo le query SQL di Spanner in cui il primo operatore nel piano di esecuzione della query è un'unione distribuita. Se tenti di leggere i dati da Spanner utilizzando una query e ricevi un'eccezione che indica che la query does not have a DistributedUnion at the root, segui i passaggi descritti in Comprendere come Spanner esegue le query per recuperare un piano di esecuzione per la query utilizzando la Google Cloud console.

Se la query SQL non è supportata, semplificala in una query che abbia un'unione distribuita come primo operatore nel piano di esecuzione della query. Rimuovi le funzioni di aggregazione, le unioni di tabelle e gli operatori DISTINCT, GROUP BY e ORDER, in quanto sono gli operatori che più probabilmente impediscono il funzionamento della query.

Creare mutazioni per una scrittura

Utilizza il metodo Mutation della classe newInsertOrUpdateBuilder anziché il metodo newInsertBuilder a meno che non sia assolutamente necessario per le pipeline Java. Per le pipeline Python, utilizza SpannerInsertOrUpdate anziché SpannerInsert. Dataflow fornisce garanzie di tipo "almeno una volta", il che significa che la mutazione potrebbe essere scritta più volte. Di conseguenza, le mutazioni INSERT potrebbero generare com.google.cloud.spanner.SpannerException: ALREADY_EXISTS errori che causano il fallimento della pipeline. Per evitare questo errore, utilizza invece la mutazione INSERT_OR_UPDATE, che aggiunge una nuova riga o aggiorna i valori delle colonne se la riga esiste già. La mutazione INSERT_OR_UPDATE può essere applicata più di una volta.

Scrivere in Spanner e trasformare i dati

Puoi scrivere dati in Spanner con il connettore Dataflow utilizzando una trasformazione SpannerIO.write per eseguire una raccolta di mutazioni di righe di input. Il connettore Dataflow raggruppa le mutazioni in batch per una maggiore efficienza.

L'esempio seguente mostra come applicare una trasformazione di scrittura a un oggetto PCollection di mutazioni:

GoogleSQL

albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId));

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
albums
    // Spanner expects a Mutation object, so create it using the Album's data
    .apply("CreateAlbumMutation", ParDo.of(new DoFn<Album, Mutation>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        Album album = c.element();
        c.output(Mutation.newInsertOrUpdateBuilder("albums")
            .set("singerId").to(album.singerId)
            .set("albumId").to(album.albumId)
            .set("albumTitle").to(album.albumTitle)
            .build());
      }
    }))
    // Write mutations to Spanner
    .apply("WriteAlbums", SpannerIO.write()
        .withInstanceId(instanceId)
        .withDatabaseId(databaseId)
        .withDialectView(dialectView));

Se una trasformazione si interrompe in modo imprevisto prima del completamento, le mutazioni già applicate non vengono annullate.

Applicare gruppi di mutazioni in modo atomico

Puoi utilizzare la classe MutationGroup per assicurarti che un gruppo di mutazioni venga applicato in modo atomico. È garantito che le mutazioni in un oggetto MutationGroup vengano inviate nella stessa transazione, ma la transazione potrebbe essere ritentata.

I gruppi di mutazioni funzionano meglio quando vengono utilizzati per raggruppare le mutazioni che interessano i dati archiviati in posizioni vicine nello spazio delle chiavi. Poiché Spanner interleave i dati delle tabelle padre e figlio nella tabella padre, questi dati sono sempre vicini nello spazio delle chiavi. Ti consigliamo di strutturare il gruppo di mutazioni in modo che contenga una mutazione applicata a una tabella padre e mutazioni aggiuntive applicate alle tabelle figlio oppure in modo che tutte le mutazioni modifichino i dati vicini nello spazio delle chiavi. Per ulteriori informazioni su come Spanner archivia i dati delle tabelle padre e figlio, consulta Schema e modello di dati. Se non organizzi i gruppi di mutazioni in base alle gerarchie di tabelle consigliate o se i dati a cui accedi non sono vicini nello spazio delle chiavi, Spanner potrebbe dover eseguire commit in due fasi, con conseguente rallentamento delle prestazioni. Per ulteriori informazioni, consulta Compromessi di località.

Per utilizzare MutationGroup, crea una trasformazione SpannerIO.write e chiama il SpannerIO.Write.grouped metodo, che restituisce una trasformazione che puoi quindi applicare a un PCollection di oggetti MutationGroup.

Quando crei un oggetto MutationGroup, la prima mutazione elencata diventa la mutazione principale. Se il gruppo di mutazioni interessa sia una tabella padre sia una tabella figlio, la mutazione principale deve essere una mutazione della tabella padre. In caso contrario, puoi utilizzare qualsiasi mutazione come mutazione principale. Il connettore Dataflow utilizza la mutazione principale per determinare i limiti delle partizioni al fine di raggruppare in batch le mutazioni in modo efficiente.

Ad esempio, supponiamo che la tua applicazione monitori il comportamento e contrassegni i comportamenti problematici degli utenti per la revisione. Per ogni comportamento contrassegnato, vuoi aggiornare la tabella Users per bloccare l'accesso dell'utente alla tua applicazione e devi anche registrare l'incidente nella tabella PendingReviews. Per assicurarti che entrambe le tabelle vengano aggiornate in modo atomico, utilizza un oggetto MutationGroup:

GoogleSQL

PCollection<MutationGroup> mutations =
    suspiciousUserIds.apply(
        MapElements.via(
            new SimpleFunction<>() {

              @Override
              public MutationGroup apply(String userId) {
                // Immediately block the user.
                Mutation userMutation =
                    Mutation.newUpdateBuilder("Users")
                        .set("id")
                        .to(userId)
                        .set("state")
                        .to("BLOCKED")
                        .build();
                long generatedId =
                    Hashing.sha1()
                        .newHasher()
                        .putString(userId, Charsets.UTF_8)
                        .putLong(timestamp.getSeconds())
                        .putLong(timestamp.getNanos())
                        .hash()
                        .asLong();

                // Add an entry to pending review requests.
                Mutation pendingReview =
                    Mutation.newInsertOrUpdateBuilder("PendingReviews")
                        .set("id")
                        .to(generatedId) // Must be deterministically generated.
                        .set("userId")
                        .to(userId)
                        .set("action")
                        .to("REVIEW ACCOUNT")
                        .set("note")
                        .to("Suspicious activity detected.")
                        .build();

                return MutationGroup.create(userMutation, pendingReview);
              }
            }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .grouped());

PostgreSQL

PCollectionView<Dialect> dialectView =
    pipeline.apply(Create.of(Dialect.POSTGRESQL)).apply(View.asSingleton());
PCollection<MutationGroup> mutations = suspiciousUserIds
    .apply(MapElements.via(new SimpleFunction<String, MutationGroup>() {

      @Override
      public MutationGroup apply(String userId) {
        // Immediately block the user.
        Mutation userMutation = Mutation.newUpdateBuilder("Users")
            .set("id").to(userId)
            .set("state").to("BLOCKED")
            .build();
        long generatedId = Hashing.sha1().newHasher()
            .putString(userId, Charsets.UTF_8)
            .putLong(timestamp.getSeconds())
            .putLong(timestamp.getNanos())
            .hash()
            .asLong();

        // Add an entry to pending review requests.
        Mutation pendingReview = Mutation.newInsertOrUpdateBuilder("PendingReviews")
            .set("id").to(generatedId)  // Must be deterministically generated.
            .set("userId").to(userId)
            .set("action").to("REVIEW ACCOUNT")
            .set("note").to("Suspicious activity detected.")
            .build();

        return MutationGroup.create(userMutation, pendingReview);
      }
    }));

mutations.apply(SpannerIO.write()
    .withInstanceId(instanceId)
    .withDatabaseId(databaseId)
    .withDialectView(dialectView)
    .grouped());

Quando crei un gruppo di mutazioni, la prima mutazione fornita come argomento diventa la mutazione principale. In questo caso, le due tabelle non sono correlate, quindi non esiste una mutazione principale chiara. Abbiamo selezionato userMutation come principale inserendola per prima. L'applicazione delle due mutazioni separatamente sarebbe più veloce, ma non garantirebbe l'atomicità, quindi il gruppo di mutazioni è la scelta migliore in questa situazione.

Passaggi successivi