Leia uma stream de alterações com Java

A biblioteca de cliente do Cloud Bigtable para Java oferece métodos de baixo nível para processar registos de alterações de dados. No entanto, na maioria dos casos, recomendamos que transmita alterações com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa as divisões e as uniões de partições por si.

Antes de começar

Antes de ler um fluxo de alterações com Java, certifique-se de que conhece a vista geral dos fluxos de alterações. Em seguida, conclua os seguintes pré-requisitos.

Configure a autenticação

Para usar os Java exemplos nesta página num ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure as Credenciais predefinidas da aplicação com as suas credenciais de utilizador.

    Instale a CLI Google Cloud.

    Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Para mais informações, consulte Set up authentication for a local development environment.

Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.

Ative uma stream de alterações

Tem de ativar um fluxo de alterações numa tabela antes de o poder ler. Também pode criar uma nova tabela com uma stream de alterações ativada.

Funções necessárias

Para receber as autorizações de que precisa para ler um fluxo de alterações do Bigtable, peça ao seu administrador que lhe conceda a seguinte função de IAM.

  • Administrador do Bigtable (roles/bigtable.admin) na instância do Bigtable que contém a tabela a partir da qual planeia transmitir alterações

Adicione a biblioteca cliente Java como uma dependência

Adicione código semelhante ao seguinte ao seu ficheiro pom.xml do Maven. Substitua VERSION pela versão da biblioteca de cliente que está a usar. A versão tem de ser 2.21.0 ou posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determine as partições da tabela

Para começar a fazer pedidos ReadChangeStream, tem de conhecer as partições da sua tabela. Isto pode ser determinado através do método GenerateInitialChangeStreamPartitions. O exemplo seguinte mostra como usar este método para obter um fluxo de ByteStringRanges que representa cada partição na tabela. Cada ByteStringRange contém a chave de início e de fim de uma partição.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Processe as alterações para cada partição

Em seguida, pode processar as alterações de cada partição através do método ReadChangeStream. Este é um exemplo de como abrir uma stream para uma partição, começando a partir da hora atual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

O comando ReadChangeStreamQuery aceita os seguintes argumentos:

  • Partição de stream (obrigatório): a partição a partir da qual as alterações são transmitidas
  • Uma das seguintes opções:
    • Hora de início: indicação de tempo de confirmação a partir da qual se iniciam as alterações de processamento
    • Continuation tokens: símbolos que representam uma posição para retomar a transmissão em fluxo a partir de
  • Hora de fim (opcional): data/hora de confirmação para parar o processamento de alterações quando for alcançada. Se não fornecer um valor, a stream continua a ler.
  • Duração do batimento cardíaco (opcional): frequência das mensagens de batimento cardíaco quando não existem novas alterações (predefinição de cinco segundos)

Altere o formato de gravação de streams

Um registo de stream de alterações devolvido é um de três tipos de respostas:

  • ChangeStreamMutation - Uma mensagem que representa um registo de alteração de dados.

  • CloseStream - Uma mensagem que indica que o cliente deve parar de ler a partir da stream.

    • Estado: indica o motivo do encerramento da stream. Uma das seguintes opções:
      • OK - A hora de fim foi atingida para a partição especificada
      • OUT_OF_RANGE - a partição especificada já não existe, o que significa que foram feitas divisões ou uniões nesta partição. Tem de criar um novo pedido ReadChangeStream para cada nova partição.
    • NewPartitions - Fornece as informações de partição atualizadas nas respostas de OUT_OF_RANGE.
    • ChangeStreamContinuationTokens - Lista de tokens usados para retomar novos pedidos ReadChangeStream a partir da mesma posição. Uma por NewPartition.
  • Heartbeat - Uma mensagem periódica com informações que podem ser usadas para verificar o estado da stream.

    • EstimatedLowWatermark - Estimativa da marca de água baixa para a partição especificada
    • ContinuationToken: token para retomar o streaming da partição indicada a partir da posição atual.

Conteúdos do registo de alterações de dados

Para informações sobre registos de fluxo de alterações, consulte o artigo O que está num registo de alteração de dados.

Faça a gestão das alterações nas partições

Quando as partições de uma tabela mudam, os pedidos ReadChangeStream devolvem uma mensagem CloseStream com as informações necessárias para retomar o streaming das novas partições.

Para uma divisão, este elemento contém várias novas partições e um ContinuationToken correspondente para cada partição. Para retomar o streaming das novas partições a partir da mesma posição, faz um novo pedido ReadChangeStream para cada nova partição com o respetivo token.

Por exemplo, se estiver a fazer streaming da partição [A,C) e esta se dividir em duas partições, [A,B) e [B,C), pode esperar a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para retomar o streaming de cada partição a partir da mesma posição, envie os seguintes pedidos ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

Para uma união, para retomar a partir da mesma partição, tem de enviar um novo pedido ReadChangeStream que contenha cada token das partições unidas.

Por exemplo, se estiver a transmitir duas partições, [A,B) e [B,C), e estas forem unidas na partição [A,C), pode esperar a seguinte sequência de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para retomar a partição de streaming [A, C) a partir da mesma posição, envia um ReadChangeStreamQuery da seguinte forma:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

O que se segue?