Leia uma stream de alterações com Java
A biblioteca de cliente do Cloud Bigtable para Java oferece métodos de baixo nível para processar registos de alterações de dados. No entanto, na maioria dos casos, recomendamos que transmita alterações com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa as divisões e as uniões de partições por si.
Antes de começar
Antes de ler um fluxo de alterações com Java, certifique-se de que conhece a vista geral dos fluxos de alterações. Em seguida, conclua os seguintes pré-requisitos.
Configure a autenticação
Para usar os Java exemplos nesta página num ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e, em seguida, configure as Credenciais predefinidas da aplicação com as suas credenciais de utilizador.
Instale a CLI Google Cloud.
Se estiver a usar um fornecedor de identidade (IdP) externo, primeiro tem de iniciar sessão na CLI gcloud com a sua identidade federada.
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Para mais informações, consulte Set up authentication for a local development environment.
Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Set up Application Default Credentials for code running on Google Cloud.
Ative uma stream de alterações
Tem de ativar um fluxo de alterações numa tabela antes de o poder ler. Também pode criar uma nova tabela com uma stream de alterações ativada.
Funções necessárias
Para receber as autorizações de que precisa para ler um fluxo de alterações do Bigtable, peça ao seu administrador que lhe conceda a seguinte função de IAM.
- Administrador do Bigtable
(
roles/bigtable.admin
) na instância do Bigtable que contém a tabela a partir da qual planeia transmitir alterações
Adicione a biblioteca cliente Java como uma dependência
Adicione código semelhante ao seguinte ao seu ficheiro pom.xml
do Maven. Substitua
VERSION
pela versão da biblioteca de cliente que está a usar. A versão tem de ser 2.21.0 ou posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determine as partições da tabela
Para começar a fazer pedidos ReadChangeStream
, tem de conhecer as partições da sua tabela. Isto pode ser determinado através do método GenerateInitialChangeStreamPartitions
. O exemplo seguinte mostra como usar este método para obter um fluxo de ByteStringRanges
que representa cada partição na tabela. Cada ByteStringRange
contém a chave de início e de fim de uma partição.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Processe as alterações para cada partição
Em seguida, pode processar as alterações de cada partição através do método ReadChangeStream
. Este é um exemplo de como abrir uma stream para uma partição, começando
a partir da hora atual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
O comando ReadChangeStreamQuery
aceita os seguintes argumentos:
- Partição de stream (obrigatório): a partição a partir da qual as alterações são transmitidas
- Uma das seguintes opções:
- Hora de início: indicação de tempo de confirmação a partir da qual se iniciam as alterações de processamento
- Continuation tokens: símbolos que representam uma posição para retomar a transmissão em fluxo a partir de
- Hora de fim (opcional): data/hora de confirmação para parar o processamento de alterações quando for alcançada. Se não fornecer um valor, a stream continua a ler.
- Duração do batimento cardíaco (opcional): frequência das mensagens de batimento cardíaco quando não existem novas alterações (predefinição de cinco segundos)
Altere o formato de gravação de streams
Um registo de stream de alterações devolvido é um de três tipos de respostas:
ChangeStreamMutation
- Uma mensagem que representa um registo de alteração de dados.CloseStream
- Uma mensagem que indica que o cliente deve parar de ler a partir da stream.- Estado: indica o motivo do encerramento da stream. Uma das seguintes opções:
OK
- A hora de fim foi atingida para a partição especificadaOUT_OF_RANGE
- a partição especificada já não existe, o que significa que foram feitas divisões ou uniões nesta partição. Tem de criar um novo pedidoReadChangeStream
para cada nova partição.
NewPartitions
- Fornece as informações de partição atualizadas nas respostas deOUT_OF_RANGE
.ChangeStreamContinuationTokens
- Lista de tokens usados para retomar novos pedidosReadChangeStream
a partir da mesma posição. Uma porNewPartition
.
- Estado: indica o motivo do encerramento da stream. Uma das seguintes opções:
Heartbeat
- Uma mensagem periódica com informações que podem ser usadas para verificar o estado da stream.EstimatedLowWatermark
- Estimativa da marca de água baixa para a partição especificadaContinuationToken
: token para retomar o streaming da partição indicada a partir da posição atual.
Conteúdos do registo de alterações de dados
Para informações sobre registos de fluxo de alterações, consulte o artigo O que está num registo de alteração de dados.
Faça a gestão das alterações nas partições
Quando as partições de uma tabela mudam, os pedidos ReadChangeStream
devolvem uma mensagem CloseStream
com as informações necessárias para retomar o streaming das novas partições.
Para uma divisão, este elemento contém várias novas partições e um ContinuationToken
correspondente para cada partição. Para retomar o streaming das novas partições
a partir da mesma posição, faz um novo pedido ReadChangeStream
para cada nova partição com o respetivo token.
Por exemplo, se estiver a fazer streaming da partição [A,C)
e esta se dividir em duas partições, [A,B)
e [B,C)
, pode esperar a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para retomar o streaming de cada partição a partir da mesma posição, envie os seguintes pedidos ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Para uma união, para retomar a partir da mesma partição, tem de enviar um novo pedido
ReadChangeStream
que contenha cada token das partições unidas.
Por exemplo, se estiver a transmitir duas partições, [A,B)
e [B,C)
, e estas forem
unidas na partição [A,C)
, pode esperar a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para retomar a partição de streaming [A, C)
a partir da mesma posição, envia um
ReadChangeStreamQuery
da seguinte forma:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));