Leggere un flusso di modifiche con Java

La libreria client di Cloud Bigtable per Java fornisce metodi di basso livello per l'elaborazione dei record di modifica dei dati. Tuttavia, nella maggior parte dei casi, ti consigliamo di eseguire lo streaming delle modifiche con Dataflow anziché utilizzare i metodi descritti in questa pagina, perché Dataflow gestisce le divisioni e le unioni delle partizioni.

Prima di iniziare

Prima di leggere un flusso di modifiche in tempo reale con Java, assicurati di conoscere la panoramica dei flussi di modifiche in tempo reale. Quindi completa i seguenti prerequisiti.

Configura l'autenticazione

Per utilizzare gli esempi di Java in questa pagina in un ambiente di sviluppo locale, installa e inizializza gcloud CLI, quindi configura Credenziali predefinite dell'applicazione con le tue credenziali utente.

  1. Installa Google Cloud CLI.

  2. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere a gcloud CLI con la tua identità federata.

  3. Se utilizzi una shell locale, crea le credenziali di autenticazione locali per il tuo account utente:

    gcloud auth application-default login

    Non devi eseguire questa operazione se utilizzi Cloud Shell.

    Se viene restituito un errore di autenticazione e utilizzi un provider di identità (IdP) esterno, verifica di aver acceduto a gcloud CLI con la tua identità federata.

Per saperne di più, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

Per informazioni sulla configurazione dell'autenticazione per un ambiente di produzione, consulta Configura le credenziali predefinite dell'applicazione per il codice in esecuzione su Google Cloud .

Abilita un flusso di modifiche

Devi abilitare un flusso di modifiche su una tabella prima di poterlo leggere. Puoi anche creare una nuova tabella con un flusso di modifiche abilitato.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per leggere un flusso di modifiche di Bigtable, chiedi all'amministratore di concederti il seguente ruolo IAM.

  • Amministratore Bigtable (roles/bigtable.admin) sull'istanza Bigtable che contiene la tabella da cui prevedi di eseguire lo streaming delle modifiche

Aggiungi la libreria client Java come dipendenza

Aggiungi codice simile al seguente al file pom.xml di Maven. Sostituisci VERSION con la versione della libreria client che stai utilizzando. La versione deve essere 2.21.0 o successive.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determinare le partizioni della tabella

Per iniziare a effettuare richieste ReadChangeStream, devi conoscere le partizioni della tabella. Questo può essere determinato utilizzando il metodo GenerateInitialChangeStreamPartitions. L'esempio seguente mostra come utilizzare questo metodo per ottenere un flusso di ByteStringRanges che rappresenta ogni partizione della tabella. Ogni ByteStringRange contiene la chiave di inizio e di fine di una partizione.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Elaborare le modifiche per ogni partizione

Puoi quindi elaborare le modifiche per ogni partizione utilizzando il metodo ReadChangeStream. Ecco un esempio di come aprire un flusso per una partizione, a partire dall'ora corrente.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery accetta i seguenti argomenti:

  • Partizione di flusso (obbligatorio): la partizione da cui eseguire lo streaming delle modifiche
  • Una delle seguenti opzioni:
    • Ora di inizio: timestamp di commit da cui iniziare a elaborare le modifiche
    • Token di continuazione: token che rappresentano una posizione da cui riprendere lo streaming
  • Ora di fine (facoltativo): timestamp di commit per interrompere l'elaborazione delle modifiche quando viene raggiunto. Se non fornisci un valore, il flusso continua a leggere.
  • Durata heartbeat (facoltativo): frequenza dei messaggi heartbeat quando non ci sono nuove modifiche (il valore predefinito è di cinque secondi)

Formato dei record del flusso di modifiche

Un record del flusso di modifiche restituito è uno dei tre tipi di risposta:

  • ChangeStreamMutation : un messaggio che rappresenta un record di modifica dei dati.

  • CloseStream : un messaggio che indica che il client deve interrompere la lettura dal flusso.

    • Stato: indica il motivo della chiusura del flusso. Una delle seguenti opzioni:
      • OK : l'ora di fine è stata raggiunta per la partizione specificata
      • OUT_OF_RANGE - la partizione specificata non esiste più, il che significa che sono state eseguite divisioni o unioni in questa partizione. Per ogni nuova partizione sarà necessario creare una nuova richiesta ReadChangeStream.
    • NewPartitions : fornisce le informazioni di partizionamento aggiornate sulle risposte OUT_OF_RANGE.
    • ChangeStreamContinuationTokens : elenco di token utilizzati per riprendere nuove richieste ReadChangeStream dalla stessa posizione. Uno per NewPartition.
  • Heartbeat : un messaggio periodico con informazioni che possono essere utilizzate per controllare lo stato del flusso.

    • EstimatedLowWatermark : stima del limite inferiore per la partizione specificata
    • ContinuationToken : token per riprendere lo streaming della partizione specificata dalla posizione corrente.

Contenuti dei record di modifica dei dati

Per informazioni sui record del flusso di modifiche, vedi Contenuto di un record di modifica dei dati.

Gestire le modifiche alle partizioni

Quando le partizioni di una tabella cambiano, le richieste ReadChangeStream restituiscono un messaggio CloseStream con le informazioni necessarie per riprendere lo streaming dalle nuove partizioni.

Per una divisione, questo conterrà più nuove partizioni e un corrispondente ContinuationToken per ogni partizione. Per riprendere lo streaming delle nuove partizioni dalla stessa posizione, effettua una nuova richiesta ReadChangeStream per ogni nuova partizione con il token corrispondente.

Ad esempio, se esegui lo streaming della partizione [A,C) e questa viene suddivisa in due partizioni, [A,B) e [B,C), puoi prevedere la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Per riprendere lo streaming di ogni partizione dalla stessa posizione, invia le seguenti richieste ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Per un'unione, per riprendere dalla stessa partizione, devi inviare una nuova richiesta ReadChangeStream contenente ogni token delle partizioni unite.

Ad esempio, se esegui lo streaming di due partizioni, [A,B) e [B,C), e queste vengono unite nella partizione [A,C), puoi prevedere la seguente sequenza di eventi:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Per riprendere lo streaming della partizione [A, C) dalla stessa posizione, invia una ReadChangeStreamQuery simile alla seguente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Passaggi successivi