Creare connessioni ai modifiche in tempo reale utilizzando Dataflow

Questa pagina mostra come creare pipeline Dataflow che utilizzano e inoltrano i dati delle modifiche di Spanner utilizzando le modifiche in tempo reale. Puoi utilizzare il codice di esempio in questa pagina per creare pipeline personalizzate.

Concetti principali

Di seguito sono riportati alcuni concetti di base per le pipeline Dataflow per modifiche in tempo realee.

Dataflow

Dataflow è un servizio serverless, veloce e conveniente che supporta l'elaborazione sia in modalità flusso che batch. Offre portabilità con i job di elaborazione scritti utilizzando le librerie open source Apache Beam e automatizza il provisioning dell'infrastruttura e la gestione dei cluster. Dataflow fornisce lo streaming quasi in tempo reale durante la lettura dai modifiche in tempo reale.

Puoi utilizzare Dataflow per utilizzare i flussi di modifiche di Spanner con il connettore SpannerIO, che offre un'astrazione sull'API Spanner per l'esecuzione di query sui modifiche in tempo reale. Con questo connettore, non devi gestire il ciclo di vita delle partizionimodifiche in tempo realehe, necessario quando utilizzi l'API Spanner direttamente. Il connettore fornisce un flusso di record di modifica dei dati in modo che tu possa concentrarti maggiormente sulla logica dell'applicazione e meno sui dettagli specifici dell'API e sul partizionamento dinamico del flusso di modifiche. Nella maggior parte dei casi in cui devi leggere i dati dello stream delle modifiche, ti consigliamo di utilizzare il connettore SpannerIO anziché l'API Spanner.

I modelli Dataflow sono pipeline Dataflow predefinite che implementano casi d'uso comuni. Per una panoramica, consulta Modelli Dataflow.

Pipeline Dataflow

Una pipeline Dataflow di modifiche in tempo reale Spanner è composta da quattro parti principali:

  1. Un database Spanner con uno stream di modifiche
  2. Il connettore SpannerIO
  3. Trasformazioni e sink definiti dall'utente
  4. Un writer I/O sink di Apache Beam

immagine

Modifiche in tempo reale Spanner

Per informazioni dettagliate su come creare un flusso di modifiche, vedi Creare un flusso di modifiche.

Connettore Apache Beam SpannerIO

Si tratta del connettore SpannerIO descritto nella sezione precedente di Dataflow. È un connettore I/O di origine che emette un PCollection di record di modifica dei dati alle fasi successive della pipeline. L'ora dell'evento per ogni record di modifica dei dati emesso sarà il timestamp di commit. Tieni presente che i record emessi non sono ordinati e che il connettore SpannerIO garantisce che non ci saranno record in ritardo.

Quando lavora con modifiche in tempo reale, Dataflow utilizza il checkpointing. Di conseguenza, ogni worker potrebbe attendere fino all'intervallo di checkpoint configurato per memorizzare nel buffer le modifiche prima di inviarle per l'ulteriore elaborazione.

Trasformazioni definite dall'utente

Una trasformazione definita dall'utente consente a un utente di aggregare, trasformare o modificare i dati di elaborazione all'interno di una pipeline Dataflow. I casi d'uso comuni sono la rimozione di informazioni che consentono l'identificazione personale, il rispetto dei requisiti di formato dei dati downstream e l'ordinamento. Consulta la documentazione ufficiale di Apache Beam per la guida alla programmazione sulle trasformazioni.

Scrittore I/O sink Apache Beam

Apache Beam contiene connettori I/O integrati che possono essere utilizzati per scrivere da una pipeline Dataflow in un sink di dati come BigQuery. I sink di dati più comuni sono supportati in modo nativo.

Modelli Dataflow

I modelli Dataflow forniscono un metodo per creare job Dataflow basati su immagini Docker predefinite per casi d'uso comuni utilizzando la console Google Cloud , la CLI Google Cloud o le chiamate API REST.

Per modifiche in tempo reale di Spanner, forniamo tre modelli Dataflow flessibili:

Quando utilizzi il modello Spanner modifiche in tempo reale to Pub/Sub, si applicano le seguenti limitazioni:

Imposta le autorizzazioni IAM per i modelli Dataflow

Prima di creare un job Dataflow con i tre modelli flessibili elencati, assicurati di disporre delle autorizzazioni IAM richieste per i seguenti service account:

Se non disponi delle autorizzazioni IAM richieste, devi specificare un service account worker gestito dall'utente per creare il job Dataflow. Per saperne di più, vedi Sicurezza e autorizzazioni di Dataflow.

Quando provi a eseguire un job da un modello flessibile Dataflow senza tutte le autorizzazioni richieste, il job potrebbe non riuscire con un errore di lettura del file dei risultati o un errore di autorizzazione negata per la risorsa. Per saperne di più, vedi Risolvere i problemi relativi ai modelli flessibili.

Crea una pipeline Dataflow

Questa sezione tratta la configurazione iniziale del connettore e fornisce esempi di integrazioni comuni con la funzionalità dimodifiche in tempo realee di Spanner.

Per seguire questi passaggi, devi disporre di un ambiente di sviluppo Java per Dataflow. Per ulteriori informazioni, vedi Creare una pipeline Dataflow utilizzando Java.

Crea un flusso di modifiche

Per informazioni dettagliate su come creare un flusso di modifiche, vedi Creare un flusso di modifiche. Per continuare con i passaggi successivi, devi disporre di un database Spanner con un flusso di modifiche configurato.

Concedere privilegi di controllo dell'accesso granulare

Se prevedi che gli utenti con controllo dell'accesso dell'accesso granulare eseguano il job Dataflow, assicurati che agli utenti venga concesso l'accesso a un ruolo del database che disponga del privilegio SELECT sullo stream di modifiche e del privilegio EXECUTE sulla funzione con valori di tabella dello stream di modifiche. Assicurati inoltre che l'entità specifica il ruolo database nella configurazione di SpannerIO o nel modello flessibile Dataflow.

Per saperne di più, consulta Informazioni sul controllo dell'accesso granulare.

Aggiungi il connettore SpannerIO come dipendenza

Il connettore Apache Beam SpannerIO incapsula la complessità dell'utilizzo degli modifiche in tempo reale direttamente tramite l'API Cloud Spanner, emettendo una PCollection di record di dati dello stream di modifiche per le fasi successive della pipeline.

Questi oggetti possono essere utilizzati in altre fasi della pipeline Dataflow dell'utente. L'integrazione del flusso di modifiche fa parte del connettore SpannerIO. Per poter utilizzare il connettore SpannerIO, la dipendenza deve essere aggiunta al file pom.xml:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam-version}</version> <!-- available from version 2.38.0 -->
</dependency>

Crea un database di metadati

Il connettore deve tenere traccia di ogni partizione durante l'esecuzione della pipeline Apache Beam. Conserva questi metadati in una tabella Spanner creata dal connettore durante l'inizializzazione. Specifichi il database in cui verrà creata questa tabella durante la configurazione del connettore.

Come descritto in Best practice per gli stream di modifiche, consigliamo di creare un nuovo database a questo scopo, anziché consentire al connettore di utilizzare il database dell'applicazione per archiviare la tabella dei metadati.

Il proprietario di un job Dataflow che utilizza il connettore SpannerIO deve disporre delle seguenti autorizzazioni IAM impostate con questo database di metadati:

  • spanner.databases.updateDdl
  • spanner.databases.beginReadOnlyTransaction
  • spanner.databases.beginOrRollbackReadWriteTransaction
  • spanner.databases.read
  • spanner.databases.select
  • spanner.databases.write
  • spanner.sessions.create
  • spanner.sessions.get

Configura il connettore

Il connettore dei modifiche in tempo reale di Spanner può essere configurato nel seguente modo:

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");    // Needed for fine-grained access control only

Timestamp startTime = Timestamp.now();
Timestamp endTime = Timestamp.ofTimeSecondsAndNanos(
   startTime.getSeconds() + (10 * 60),
   startTime.getNanos()
);

SpannerIO
  .readChangeStream()
  .withSpannerConfig(spannerConfig)
  .withChangeStreamName("my-change-stream")
  .withMetadataInstance("my-meta-instance-id")
  .withMetadataDatabase("my-meta-database-id")
  .withMetadataTable("my-meta-table-name")
  .withRpcPriority(RpcPriority.MEDIUM)
  .withInclusiveStartAt(startTime)
  .withInclusiveEndAt(endTime);

Di seguito sono riportate le descrizioni delle opzioni readChangeStream():

Configurazione Spanner (obbligatoria)

Utilizzato per configurare il progetto, l'istanza e il database da cui è stato creato e da cui deve essere eseguita la query sullo stream delle modifiche. Specifica anche, facoltativamente, il ruolo del database da utilizzare quando l'entità IAM che esegue il job Dataflow è un utente con controllo dell'accesso dell'accesso granulare. Il job assume questo ruolo del database per accedere allo stream di modifiche. Per maggiori informazioni, consulta l'articolo Informazioni sul controllo dell'accesso granulare.

Nome della modifica in tempo reale (obbligatorio)

Questo nome identifica in modo univoco lo stream delle modifiche. Il nome qui deve essere lo stesso utilizzato durante la creazione.

ID istanza metadati (facoltativo)

Questa è l'istanza in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API change stream.

ID database metadati (obbligatorio)

Questo è il database in cui archiviare i metadati utilizzati dal connettore per controllare il consumo dei dati dell'API change stream.

Nome della tabella dei metadati (facoltativo)

Deve essere utilizzato solo quando viene aggiornata una pipeline esistente.

Questo è il nome della tabella dei metadati preesistente da utilizzare dal connettore. Viene utilizzato dal connettore per archiviare i metadati per controllare il consumo dei dati dell'API change stream. Se questa opzione viene omessa, Spanner crea una nuova tabella con un nome generato durante l'inizializzazione del connettore.

Priorità RPC (facoltativo)

La priorità della richiesta da utilizzare per le query del flusso di modifiche. Se questo parametro viene omesso, verrà utilizzato high priority.

InclusiveStartAt (obbligatorio)

Le modifiche apportate a partire dal timestamp specificato vengono restituite al chiamante.

InclusiveEndAt (facoltativo)

Le modifiche fino al timestamp specificato vengono restituite al chiamante. Se questo parametro viene omesso, le modifiche verranno emesse a tempo indeterminato.

Aggiungere trasformazioni e sink per elaborare i dati delle modifiche

Una volta completati i passaggi precedenti, il connettore SpannerIO configurato è pronto a emettere una PCollection di oggetti DataChangeRecord. Consulta Trasformazioni e sink di esempio per diverse configurazioni di pipeline di esempio che elaborano questi dati di streaming in vari modi.

Tieni presente che i record del flusso di modifiche emessi dal connettore SpannerIO non sono ordinati. Questo perché le PCollection non forniscono alcuna garanzia di ordinamento. Se hai bisogno di uno stream ordinato, devi raggruppare e ordinare i record come trasformazioni nelle pipeline. Vedi Esempio: ordina per chiave. Puoi estendere questo esempio per ordinare i record in base a qualsiasi campo, ad esempio in base agli ID transazione.

Esempi di trasformazioni e sink

Puoi definire le tue trasformazioni e specificare i sink in cui scrivere i dati. La documentazione di Apache Beam fornisce una miriade di trasformazioni che possono essere applicate, nonché connettori I/O pronti all'uso per scrivere i dati in sistemi esterni.

Esempio: ordina per chiave

Questo esempio di codice genera record di modifica dei dati ordinati in base al timestamp di commit e raggruppati per chiavi primarie utilizzando il connettore Dataflow.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new BreakRecordByModFn()))
  .apply(ParDo.of(new KeyByIdFn()))
  .apply(ParDo.of(new BufferKeyUntilOutputTimestamp()))
  // Subsequent processing goes here

Questo esempio di codice utilizza stati e timer per memorizzare nel buffer i record per ogni chiave e imposta il tempo di scadenza del timer su un periodo di tempo configurato dall'utente T in futuro (definito nella funzione BufferKeyUntilOutputTimestamp). Quando il watermark di Dataflow supera l'ora T, questo codice svuota tutti i record nel buffer con timestamp inferiore a T, li ordina in base al timestamp di commit e restituisce una coppia chiave-valore in cui:

  • La chiave è la chiave di input, ovvero la chiave primaria sottoposta ad hashing in un array di bucket di dimensione 1000.
  • Il valore è costituito dai record di modifica dei dati ordinati memorizzati nel buffer per la chiave.

Per ogni chiave, abbiamo le seguenti garanzie:

  • I timer vengono attivati nell'ordine del timestamp di scadenza.
  • È garantito che le fasi downstream ricevano gli elementi nello stesso ordine in cui sono stati prodotti.

Ad esempio, con una chiave di valore 100, il timer si attiva rispettivamente alle ore T1 e T10, producendo un bundle di record di modifica dei dati a ogni timestamp. Poiché i record di modifica dei dati generati alle ore T1 sono stati prodotti prima di quelli generati alle ore T10, è garantito che i record di modifica dei dati generati alle ore T1 vengano ricevuti dalla fase successiva prima di quelli generati alle ore T10. Questo meccanismo ci aiuta a garantire un ordine rigoroso dei timestamp di commit per chiave primaria per l'elaborazione downstream.

Questo processo si ripeterà fino al termine della pipeline e all'elaborazione di tutti i record di modifica dei dati (o si ripeterà all'infinito se non viene specificato un orario di fine).

Tieni presente che questo esempio di codice utilizza stati e timer, anziché finestre, per eseguire l'ordinamento per chiave. Il motivo è che non è garantito che le finestre vengano elaborate in ordine. Ciò significa che le finestre meno recenti possono essere elaborate in un secondo momento rispetto a quelle più recenti, il che potrebbe comportare un'elaborazione fuori ordine.

BreakRecordByModFn

Ogni record di modifica dei dati può contenere diverse modifiche. Ogni modifica rappresenta un inserimento, un aggiornamento o un'eliminazione di un singolo valore della chiave primaria. Questa funzione suddivide ogni record di modifica dei dati in record di modifica dei dati separati, uno per ogni mod.

private static class BreakRecordByModFn extends DoFn<DataChangeRecord,
                                                     DataChangeRecord>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record, OutputReceiver<DataChangeRecord>
    outputReceiver) {
    record.getMods().stream()
      .map(
          mod ->
              new DataChangeRecord(
                  record.getPartitionToken(),
                  record.getCommitTimestamp(),
                  record.getServerTransactionId(),
                  record.isLastRecordInTransactionInPartition(),
                  record.getRecordSequence(),
                  record.getTableName(),
                  record.getRowType(),
                  Collections.singletonList(mod),
                  record.getModType(),
                  record.getValueCaptureType(),
                  record.getNumberOfRecordsInTransaction(),
                  record.getNumberOfPartitionsInTransaction(),
                  record.getTransactionTag(),
                  record.isSystemTransaction(),
                  record.getMetadata()))
      .forEach(outputReceiver::output);
  }
}

KeyByIdFn

Questa funzione accetta un DataChangeRecord e restituisce un DataChangeRecord indicizzato dalla chiave primaria Spanner sottoposta ad hashing a un valore intero.

private static class KeyByIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  // NUMBER_OF_BUCKETS should be configured by the user to match their key cardinality
  // Here, we are choosing to hash the Spanner primary keys to a bucket index, in order to have a deterministic number
  // of states and timers for performance purposes.
  // Note that having too many buckets might have undesirable effects if it results in a low number of records per bucket
  // On the other hand, having too few buckets might also be problematic, since many keys will be contained within them.
  private static final int NUMBER_OF_BUCKETS = 1000;

  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    int hashCode = (int) record.getMods().get(0).getKeysJson().hashCode();
    // Hash the received keys into a bucket in order to have a
    // deterministic number of buffers and timers.
    String bucketIndex = String.valueOf(hashCode % NUMBER_OF_BUCKETS);

    outputReceiver.output(KV.of(bucketIndex, record));
  }
}

BufferKeyUntilOutputTimestamp

I timer e i buffer sono per chiave. Questa funzione memorizza nel buffer ogni record di modifica dei dati finché il watermark non supera il timestamp in corrispondenza del quale vogliamo restituire i record di modifica dei dati memorizzati nel buffer.

Questo codice utilizza un timer di loop per determinare quando svuotare il buffer:

  1. Quando rileva per la prima volta un record di modifica dei dati per una chiave, imposta il timer in modo che si attivi all'ora di commit del record di modifica dei dati + incrementIntervalSeconds (un'opzione configurabile dall'utente).
  2. Quando il timer si attiva, aggiunge a recordsToOutput tutti i record di modifica dei dati nel buffer con timestamp inferiore alla durata del timer. Se il buffer contiene record di modifica dei dati il cui timestamp è maggiore o uguale all'ora di scadenza del timer, li aggiunge nuovamente al buffer anziché restituirli. Quindi, imposta il timer successivo sull'ora di scadenza del timer corrente più incrementIntervalInSeconds.
  3. Se recordsToOutput non è vuoto, la funzione ordina i record di modifica dei dati in recordsToOutput in base al timestamp di commit e all'ID transazione, quindi li restituisce.
private static class BufferKeyUntilOutputTimestamp extends
    DoFn<KV<String, DataChangeRecord>, KV<String, Iterable<DataChangeRecord>>>  {
  private static final Logger LOG =
      LoggerFactory.getLogger(BufferKeyUntilOutputTimestamp.class);

  private final long incrementIntervalInSeconds = 2;

  private BufferKeyUntilOutputTimestamp(long incrementIntervalInSeconds) {
    this.incrementIntervalInSeconds = incrementIntervalInSeconds;
  }

  @SuppressWarnings("unused")
  @TimerId("timer")
  private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("keyString")
  private final StateSpec<ValueState<String>> keyString =
      StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void process(
      @Element KV<String, DataChangeRecord> element,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    buffer.add(element.getValue());

    // Only set the timer if this is the first time we are receiving a data change
    // record with this key.
    String elementKey = keyString.read();
    if (elementKey == null) {
      Instant commitTimestamp =
          new Instant(element.getValue().getCommitTimestamp().toSqlTimestamp());
      Instant outputTimestamp =
          commitTimestamp.plus(Duration.standardSeconds(incrementIntervalInSeconds));
      timer.set(outputTimestamp);
      keyString.write(element.getKey());
    }
  }

  @OnTimer("timer")
  public void onExpiry(
      OnTimerContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @TimerId("timer") Timer timer,
      @StateId("keyString") ValueState<String> keyString) {
    if (!buffer.isEmpty().read()) {
      String elementKey = keyString.read();

      final List<DataChangeRecord> records =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .collect(Collectors.toList());
      buffer.clear();

      List<DataChangeRecord> recordsToOutput = new ArrayList<>();
      for (DataChangeRecord record : records) {
        Instant recordCommitTimestamp =
            new Instant(record.getCommitTimestamp().toSqlTimestamp());
        final String recordString =
            record.getMods().get(0).getNewValuesJson().isEmpty()
                ? "Deleted record"
                : record.getMods().get(0).getNewValuesJson();
        // When the watermark passes time T, this means that all records with
        // event time < T have been processed and successfully committed. Since the
        // timer fires when the watermark passes the expiration time, we should
        // only output records with event time < expiration time.
        if (recordCommitTimestamp.isBefore(context.timestamp())) {
          LOG.info(
             "Outputting record with key {} and value {} at expiration " +
             "timestamp {}",
              elementKey,
              recordString,
              context.timestamp().toString());
          recordsToOutput.add(record);
        } else {
          LOG.info(
              "Expired at {} but adding record with key {} and value {} back to " +
              "buffer due to commit timestamp {}",
              context.timestamp().toString(),
              elementKey,
              recordString,
              recordCommitTimestamp.toString());
          buffer.add(record);
        }
      }

      // Output records, if there are any to output.
      if (!recordsToOutput.isEmpty()) {
        // Order the records in place, and output them. The user would need
        // to implement DataChangeRecordComparator class that sorts the
        // data change records by commit timestamp and transaction ID.
        Collections.sort(recordsToOutput, new DataChangeRecordComparator());
        context.outputWithTimestamp(
            KV.of(elementKey, recordsToOutput), context.timestamp());
        LOG.info(
            "Expired at {}, outputting records for key {}",
            context.timestamp().toString(),
            elementKey);
      } else {
        LOG.info("Expired at {} with no records", context.timestamp().toString());
      }
    }

    Instant nextTimer = context.timestamp().plus(Duration.standardSeconds(incrementIntervalInSeconds));
    if (buffer.isEmpty() != null && !buffer.isEmpty().read()) {
      LOG.info("Setting next timer to {}", nextTimer.toString());
      timer.set(nextTimer);
    } else {
      LOG.info(
          "Timer not being set since the buffer is empty: ");
      keyString.clear();
    }
  }
}

Ordinamento delle transazioni

Questa pipeline può essere modificata per ordinare in base all'ID transazione e al timestamp di commit. A questo scopo, memorizza i record nel buffer per ogni coppia ID transazione / timestamp di commit, anziché per ogni chiave Spanner. Ciò richiede la modifica del codice in KeyByIdFn.

Esempio: assemblaggio delle transazioni

Questo esempio di codice legge i record di modifica dei dati, assembla tutti i record di modifica dei dati appartenenti alla stessa transazione in un unico elemento e restituisce l'elemento. Tieni presente che le transazioni generate da questo codice campione non sono ordinate in base al timestamp di commit.

Questo esempio di codice utilizza i buffer per assemblare le transazioni dai record di modifica dei dati. Quando riceve per la prima volta un record di modifica dei dati appartenente a una transazione, legge il campo numberOfRecordsInTransaction nel record di modifica dei dati, che descrive il numero previsto di record di modifica dei dati appartenenti a quella transazione. Memorizza nel buffer i record di modifica dei dati appartenenti a quella transazione finché il numero di record memorizzati nel buffer non corrisponde a numberOfRecordsInTransaction, dopodiché restituisce i record di modifica dei dati raggruppati.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new KeyByTransactionIdFn()))
  .apply(ParDo.of(new TransactionBoundaryFn()))
  // Subsequent processing goes here

KeyByTransactionIdFn

Questa funzione accetta un DataChangeRecord e restituisce un DataChangeRecord con chiave l'ID transazione.

private static class KeyByTransactionIdFn extends DoFn<DataChangeRecord, KV<String, DataChangeRecord>>  {
  @ProcessElement
  public void processElement(
      @Element DataChangeRecord record,
      OutputReceiver<KV<String, DataChangeRecord>> outputReceiver) {
    outputReceiver.output(KV.of(record.getServerTransactionId(), record));
  }
}

TransactionBoundaryFn

I buffer TransactionBoundaryFn hanno ricevuto coppie chiave-valore di {TransactionId, DataChangeRecord} da KeyByTransactionIdFn e le memorizzano in gruppi in base a TransactionId. Quando il numero di record memorizzati nel buffer è uguale al numero di record contenuti nell'intera transazione, questa funzione ordina gli oggetti DataChangeRecord nel gruppo in base alla sequenza dei record e restituisce una coppia chiave-valore di {CommitTimestamp, TransactionId}, Iterable<DataChangeRecord>.

Qui si presuppone che SortKey sia una classe definita dall'utente che rappresenta una coppia {CommitTimestamp, TransactionId}. Per maggiori informazioni su SortKey, consulta l'implementazione di esempio.

private static class TransactionBoundaryFn extends DoFn<KV<String, DataChangeRecord>, KV<SortKey, Iterable<DataChangeRecord>>>  {
  @StateId("buffer")
  private final StateSpec<BagState<DataChangeRecord>> buffer = StateSpecs.bag();

  @StateId("count")
  private final StateSpec<ValueState<Integer>> countState = StateSpecs.value();

  @ProcessElement
  public void process(
      ProcessContext context,
      @StateId("buffer") BagState<DataChangeRecord> buffer,
      @StateId("count") ValueState<Integer> countState) {
    final KV<String, DataChangeRecord> element = context.element();
    final DataChangeRecord record = element.getValue();

    buffer.add(record);
    int count = (countState.read() != null ? countState.read() : 0);
    count = count + 1;
    countState.write(count);

    if (count == record.getNumberOfRecordsInTransaction()) {
      final List<DataChangeRecord> sortedRecords =
          StreamSupport.stream(buffer.read().spliterator(), false)
              .sorted(Comparator.comparing(DataChangeRecord::getRecordSequence))
              .collect(Collectors.toList());

      final Instant commitInstant =
          new Instant(sortedRecords.get(0).getCommitTimestamp().toSqlTimestamp()
              .getTime());
      context.outputWithTimestamp(
          KV.of(
              new SortKey(sortedRecords.get(0).getCommitTimestamp(),
                          sortedRecords.get(0).getServerTransactionId()),
              sortedRecords),
          commitInstant);
      buffer.clear();
      countState.clear();
    }
  }
}

Esempio: filtrare per tag transazione

Quando una transazione che modifica i dati utente viene taggata, il tag corrispondente e il relativo tipo vengono memorizzati come parte di DataChangeRecord. Questi esempi mostrano come filtrare i record del flusso di modifiche in base ai tag di transazione definiti dall'utente e ai tag di sistema:

Filtro dei tag definiti dall'utente per my-tx-tag:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           !record.isSystemTransaction()
           && record.getTransactionTag().equalsIgnoreCase("my-tx-tag")))
  // Subsequent processing goes here

Filtro dei tag di sistema/controllo TTL:

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(Filter.by(record ->
           record.isSystemTransaction()
           && record.getTransactionTag().equals("RowDeletionPolicy")))
  // Subsequent processing goes here

Esempio: recupera riga completa

Questo esempio funziona con una tabella Spanner denominata Singer con la seguente definizione:

CREATE TABLE Singers (
  SingerId INT64 NOT NULL,
  FirstName STRING(1024),
  LastName STRING(1024)
) PRIMARY KEY (SingerId);

In modalità di acquisizione dei valori OLD_AND_NEW_VALUES predefinita dei modifiche in tempo reale, quando viene aggiornata una riga Spanner, il record di modifica dei dati ricevuto conterrà solo le colonne modificate. Le colonne monitorate ma invariate non verranno incluse nel record. La chiave primaria della mod può essere utilizzata per eseguire una lettura dello snapshot di Spanner al timestamp di commit del record di modifica dei dati per recuperare le colonne invariate o anche l'intera riga.

Tieni presente che la policy di conservazione del database potrebbe dover essere modificata con un valore maggiore o uguale alla policy di conservazione dello stream delle modifiche affinché la lettura dello snapshot vada a buon fine.

Tieni inoltre presente che l'utilizzo del tipo di acquisizione del valore NEW_ROW è il modo consigliato e più efficiente per farlo, poiché restituisce tutte le colonne monitorate della riga per impostazione predefinita e non richiede una lettura aggiuntiva dello snapshot in Spanner.

SpannerConfig spannerConfig = SpannerConfig
   .create()
   .withProjectId("my-project-id")
   .withInstanceId("my-instance-id")
   .withDatabaseId("my-database-id")
   .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
   .apply(SpannerIO
       .readChangeStream()
       .withSpannerConfig(spannerConfig)
       // Assume we have a change stream "my-change-stream" that watches Singers table.
       .withChangeStreamName("my-change-stream")
       .withMetadataInstance("my-metadata-instance-id")
       .withMetadataDatabase("my-metadata-database-id")
       .withInclusiveStartAt(Timestamp.now()))
   .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
   // Subsequent processing goes here

ToFullRowJsonFn

Questa trasformazione eseguirà una lettura obsoleta al timestamp di commit di ogni record ricevuto e mapperà l'intera riga in JSON.

public class ToFullRowJsonFn extends DoFn<DataChangeRecord, String> {
 // Since each instance of this DoFn will create its own session pool and will
 // perform calls to Spanner sequentially, we keep the number of sessions in
 // the pool small. This way, we avoid wasting resources.
 private static final int MIN_SESSIONS = 1;
 private static final int MAX_SESSIONS = 5;
 private final String projectId;
 private final String instanceId;
 private final String databaseId;

 private transient DatabaseClient client;
 private transient Spanner spanner;

 public ToFullRowJsonFn(SpannerConfig spannerConfig) {
   this.projectId = spannerConfig.getProjectId().get();
   this.instanceId = spannerConfig.getInstanceId().get();
   this.databaseId = spannerConfig.getDatabaseId().get();
 }

 @Setup
 public void setup() {
   SessionPoolOptions sessionPoolOptions = SessionPoolOptions
      .newBuilder()
      .setMinSessions(MIN_SESSIONS)
      .setMaxSessions(MAX_SESSIONS)
      .build();
   SpannerOptions options = SpannerOptions
       .newBuilder()
       .setProjectId(projectId)
       .setSessionPoolOption(sessionPoolOptions)
       .build();
   DatabaseId id = DatabaseId.of(projectId, instanceId, databaseId);
   spanner = options.getService();
   client = spanner.getDatabaseClient(id);
 }

 @Teardown
 public void teardown() {
   spanner.close();
 }

 @ProcessElement
 public void process(
   @Element DataChangeRecord element,
   OutputReceiver<String> output) {
   com.google.cloud.Timestamp commitTimestamp = element.getCommitTimestamp();
   element.getMods().forEach(mod -> {
     JSONObject keysJson = new JSONObject(mod.getKeysJson());
     JSONObject newValuesJson = new JSONObject(mod.getNewValuesJson());
     ModType modType = element.getModType();
     JSONObject jsonRow = new JSONObject();
     long singerId = keysJson.getLong("SingerId");
     jsonRow.put("SingerId", singerId);
     if (modType == ModType.INSERT) {
       // For INSERT mod, get non-primary key columns from mod.
       jsonRow.put("FirstName", newValuesJson.get("FirstName"));
       jsonRow.put("LastName", newValuesJson.get("LastName"));
     } else if (modType == ModType.UPDATE) {
       // For UPDATE mod, get non-primary key columns by doing a snapshot read using the primary key column from mod.
       try (ResultSet resultSet = client
         .singleUse(TimestampBound.ofReadTimestamp(commitTimestamp))
         .read(
           "Singers",
           KeySet.singleKey(com.google.cloud.spanner.Key.of(singerId)),
             Arrays.asList("FirstName", "LastName"))) {
         if (resultSet.next()) {
           jsonRow.put("FirstName", resultSet.isNull("FirstName") ?
             JSONObject.NULL : resultSet.getString("FirstName"));
           jsonRow.put("LastName", resultSet.isNull("LastName") ?
             JSONObject.NULL : resultSet.getString("LastName"));
         }
       }
     } else {
       // For DELETE mod, there is nothing to do, as we already set SingerId.
     }

     output.output(jsonRow.toString());
   });
 }
}

Questo codice crea un client di database Spanner per eseguire il recupero completo delle righe e configura il pool di sessioni in modo che contenga solo poche sessioni, eseguendo le letture in un'istanza di ToFullReowJsonFn in sequenza. Dataflow si assicura di generare molte istanze di questa funzione, ognuna con il proprio pool di client.

Esempio: da Spanner a Pub/Sub

In questo scenario, il chiamante trasmette i record a Pub/Sub il più rapidamente possibile, senza raggruppamenti o aggregazioni. Questa è una buona soluzione per attivare l'elaborazione downstream, ad esempio lo streaming di tutte le nuove righe inserite in una tabella Spanner in Pub/Sub per un'ulteriore elaborazione.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(PubsubIO.writeStrings().to("my-topic"));

Tieni presente che il sink Pub/Sub può essere configurato per garantire la semantica exactly-once.

Esempio: da Spanner a Cloud Storage

In questo scenario, il chiamante raggruppa tutti i record all'interno di una determinata finestra e salva il gruppo in file Cloud Storage separati. È una buona soluzione per l'analisi e l'archiviazione point-in-time, che è indipendente dal periodo di conservazione di Spanner.

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(SpannerConfig
      .create()
      .withProjectId("my-project-id")
      .withInstanceId("my-instance-id")
      .withDatabaseId("my-database-id")
      .withDatabaseRole("my-database-role"))    // Needed for fine-grained access control only
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(MapElements.into(TypeDescriptors.strings()).via(Object::toString))
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
  .apply(TextIO
    .write()
    .to("gs://my-bucket/change-stream-results-")
    .withSuffix(".txt")
    .withWindowedWrites()
    .withNumShards(1));

Tieni presente che il sink Cloud Storage fornisce la semantica at-least-once per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato per avere una semantica exactly-once.

Forniamo anche un modello Dataflow per questo caso d'uso: consulta Connettere modifiche in tempo reale a Cloud Storage.

Esempio: da Spanner a BigQuery (tabella del registro)

Qui, il chiamante trasmette i record delle modifiche in BigQuery. Ogni record di modifica dei dati viene visualizzato come una riga in BigQuery. È una buona soluzione per l'analisi. Questo codice utilizza le funzioni definite in precedenza, nella sezione Recupera riga completa, per recuperare la riga completa del record e scriverla in BigQuery.

SpannerConfig spannerConfig = SpannerConfig
  .create()
  .withProjectId("my-project-id")
  .withInstanceId("my-instance-id")
  .withDatabaseId("my-database-id")
  .withDatabaseRole("my-database-role");   // Needed for fine-grained access control only

pipeline
  .apply(SpannerIO
    .readChangeStream()
    .withSpannerConfig(spannerConfig)
    .withChangeStreamName("my-change-stream")
    .withMetadataInstance("my-metadata-instance-id")
    .withMetadataDatabase("my-metadata-database-id")
    .withInclusiveStartAt(Timestamp.now()))
  .apply(ParDo.of(new ToFullRowJsonFn(spannerConfig)))
  .apply(BigQueryIO
    .<String>write()
    .to("my-bigquery-table")
    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(Write.WriteDisposition.WRITE_APPEND)
    .withSchema(new TableSchema().setFields(Arrays.asList(
      new TableFieldSchema()
        .setName("SingerId")
        .setType("INT64")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("FirstName")
        .setType("STRING")
        .setMode("REQUIRED"),
      new TableFieldSchema()
        .setName("LastName")
        .setType("STRING")
        .setMode("REQUIRED")
    )))
    .withAutoSharding()
    .optimizedWrites()
    .withFormatFunction((String element) -> {
      ObjectMapper objectMapper = new ObjectMapper();
      JsonNode jsonNode = null;
      try {
        jsonNode = objectMapper.readTree(element);
      } catch (IOException e) {
        e.printStackTrace();
      }
      return new TableRow()
        .set("SingerId", jsonNode.get("SingerId").asInt())
        .set("FirstName", jsonNode.get("FirstName").asText())
        .set("LastName", jsonNode.get("LastName").asText());
    }
  )
);

Tieni presente che il sink BigQuery fornisce la semantica almeno una volta per impostazione predefinita. Con un'elaborazione aggiuntiva, può essere modificato per avere una semantica exactly-once.

Forniamo anche un modello Dataflow per questo caso d'uso. Consulta Connettere modifiche in tempo reale a BigQuery.

Monitorare una pipeline

Sono disponibili due classi di metriche per monitorare una pipeline Dataflow di stream delle modifiche.

Metriche Dataflow standard

Dataflow fornisce diverse metriche per garantire l'integrità del job, come l'aggiornamento dei dati, il ritardo di sistema, la velocità effettiva del job, l'utilizzo della CPU del worker e altro ancora. Per ulteriori informazioni, consulta Utilizzo di Monitoring per le pipeline Dataflow.

Per le pipeline di modifiche in tempo reale, è necessario prendere in considerazione due metriche principali: la latenza di sistema e la freschezza dei dati.

La latenza di sistema indica la durata massima attuale (in secondi) per cui un elemento di dati viene elaborato o è in attesa di elaborazione.

L'aggiornamento dei dati mostrerà la quantità di tempo tra ora (in tempo reale) e il watermark di output. La filigrana di output dell'ora T indica che tutti gli elementi con un'ora evento (rigorosamente) precedente a T sono stati elaborati per il calcolo. In altre parole, l'aggiornamento dei dati misura il grado di attualità della pipeline in relazione all'elaborazione degli eventi ricevuti.

Se la pipeline non dispone di risorse sufficienti, puoi notare l'effetto in queste due metriche. La latenza del sistema aumenterà perché gli elementi dovranno attendere più a lungo prima di essere elaborati. Aumenterà anche l'aggiornamento dei dati, perché la pipeline non sarà in grado di tenere il passo con la quantità di dati ricevuti.

Metriche personalizzate del flusso di modifiche

Queste metriche sono esposte in Cloud Monitoring e includono:

  • Latenza raggruppata (istogramma) tra il commit di un record in Spanner e la sua emissione in una PCollection da parte del connettore. Questa metrica può essere utilizzata per rilevare eventuali problemi di prestazioni (latenza) della pipeline.
  • Numero totale di record di dati letti. Si tratta di un'indicazione generale del numero di record emessi dal connettore. Questo numero deve aumentare sempre, rispecchiando l'andamento delle scritture nel database Spanner sottostante.
  • Numero di partizioni in fase di lettura. Devono sempre essere lette delle partizioni. Se questo numero è zero, indica che si è verificato un errore nella pipeline.
  • Numero totale di query emesse durante l'esecuzione del connettore. Si tratta di un'indicazione generale delle query di modifiche in tempo reale eseguite sull'istanza Spanner durante l'esecuzione della pipeline. Può essere utilizzato per ottenere una stima del carico dal connettore al database Spanner.

Aggiornare una pipeline esistente

È possibile aggiornare una pipeline in esecuzione che utilizza il connettore SpannerIO per elaborare gli modifiche in tempo reale se i controlli di compatibilità dei job vengono superati. Per farlo, devi impostare esplicitamente il parametro del nome della tabella dei metadati del nuovo job durante l'aggiornamento. Utilizza il valore dell'opzione della pipeline metadataTable del job che stai aggiornando.

Se utilizzi un modello Dataflow fornito da Google, imposta il nome della tabella utilizzando il parametro spannerMetadataTableName. Puoi anche modificare il job esistente per utilizzare in modo esplicito la tabella dei metadati con il metodo withMetadataTable(your-metadata-table-name) nella configurazione del connettore. Una volta completata questa operazione, puoi seguire le istruzioni riportate in Avvio del job di sostituzione della documentazione di Dataflow per aggiornare un job in esecuzione.

Best practice per modifiche in tempo reale e Dataflow

Di seguito sono riportate alcune best practice per la creazione di connessioni ai modifiche in tempo reale utilizzando Dataflow.

Utilizzare un database di metadati separato

Consigliamo di creare un database separato da utilizzare per l'archiviazione dei metadati del connettore SpannerIO, anziché configurarlo per l'utilizzo del database dell'applicazione.

Per saperne di più, consulta Valuta la possibilità di utilizzare un database di metadati separato.

Dimensiona il cluster

Una regola generale per un numero iniziale di worker in un job di modifiche in tempo reale Spanner è un worker per 1000 scritture al secondo. Tieni presente che questa stima può variare a seconda di diversi fattori, come le dimensioni di ogni transazione, il numero di record di stream di modifiche prodotti da una singola transazione e altre trasformazioni, aggregazioni o sink utilizzati nella pipeline.

Dopo l'allocazione iniziale delle risorse, è importante tenere traccia delle metriche menzionate in Monitorare una pipeline per garantire che la pipeline sia integra. Ti consigliamo di sperimentare una dimensione iniziale del pool di worker e monitorare il modo in cui la pipeline gestisce il carico, aumentando il numero di nodi, se necessario. L'utilizzo della CPU è una metrica chiave per verificare se il carico è corretto e se sono necessari più nodi.

Limitazioni note

Esistono alcune limitazioni note quando si utilizzano le modifiche in tempo reale di Spanner con Dataflow:

Scalabilità automatica

Il supporto della scalabilità automatica per le pipeline che includono SpannerIO.readChangeStream richiede Apache Beam 2.39.0 o versioni successive.

Se utilizzi una versione di Apache Beam precedente a 2.39.0, le pipeline che includono SpannerIO.readChangeStream devono specificare esplicitamente l'algoritmo di scalabilità automatica come NONE, come descritto in Scalabilità automatica orizzontale.

Per scalare manualmente una pipeline Dataflow anziché utilizzare la scalabilità automatica, consulta Scalare manualmente una pipeline di streaming.

Runner V2

Il connettore dei modifiche in tempo reale di Spanner richiede Dataflow Runner v2. Questo valore deve essere specificato manualmente durante l'esecuzione, altrimenti verrà generato un errore. Puoi specificare Runner V2 configurando il job con --experiments=use_unified_worker,use_runner_v2.

Snapshot

Il connettore Spanner modifiche in tempo reale non supporta gli snapshot Dataflow.

Svuotamento in corso

Il connettore di modifiche in tempo reale Spanner non supporta lo svuotamento di un job. È possibile annullare solo un job esistente.

Puoi anche aggiornare una pipeline esistente senza doverla interrompere.

OpenCensus

Per utilizzare OpenCensus per monitorare la pipeline, specifica la versione 0.28.3 o successive.

NullPointerException all'avvio della pipeline

Un bug nella versione 2.38.0 di Apache Beam può causare un NullPointerException all'avvio della pipeline in determinate condizioni. In questo modo, il job non verrà avviato e verrà visualizzato il seguente messaggio di errore:

java.lang.NullPointerException: null value in entry: Cloud Storage_PROJECT_ID=null

Per risolvere questo problema, utilizza Apache Beam versione 2.39.0 o successive oppure specifica manualmente la versione di beam-sdks-java-core come 2.37.0:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-core</artifactId>
  <version>2.37.0</version>
</dependency>

Ulteriori informazioni