Ler um fluxo de alterações com Java
A biblioteca de cliente do Cloud Bigtable para Java fornece métodos de nível inferior para processar registros de alteração de dados. No entanto, na maioria dos casos, recomendamos que você faça streaming de alterações com o Dataflow em vez de usar os métodos descritos nesta página, porque o Dataflow processa divisões e mesclagens de partições.
Antes de começar
Antes de ler um fluxo de alterações com Java, confira a Visão geral do fluxo de alterações. Depois, siga estes pré-requisitos.
Configurar a autenticação
Para usar os exemplos de Java nesta página em um ambiente de desenvolvimento local, instale e inicialize a CLI gcloud e configure o Application Default Credentials com suas credenciais de usuário.
-
Instale a Google Cloud CLI.
-
Ao usar um provedor de identidade (IdP) externo, primeiro faça login na CLI gcloud com sua identidade federada.
-
Se você estiver usando um shell local, crie credenciais de autenticação local para sua conta de usuário:
gcloud auth application-default login
Não é necessário fazer isso se você estiver usando o Cloud Shell.
Se um erro de autenticação for retornado e você estiver usando um provedor de identidade (IdP) externo, confirme se você fez login na CLI gcloud com sua identidade federada.
Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.
Para informações sobre como configurar a autenticação para um ambiente de produção, consulte Configurar o Application Default Credentials para código em execução no Google Cloud .
Ativar um fluxo de alterações
Você precisa ativar um fluxo de alterações em uma tabela antes de poder lê-lo. Também é possível criar uma nova tabela com um fluxo de alterações ativado.
Funções exigidas
Para receber as permissões necessárias para ler um fluxo de alterações do Bigtable, peça ao administrador para conceder a você o seguinte papel do IAM.
- Administrador do Bigtable (
roles/bigtable.admin) na instância do Bigtable que contém a tabela com as alterações que você pretende mostrar
Adicionar a biblioteca de cliente em Java como uma dependência
Adicione um código semelhante ao código a seguir no seu arquivo pom.xml do Maven. Substitua VERSION pela versão da biblioteca de cliente que você usa. A versão precisa ser 2.21.0 ou posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determinar as partições da tabela
Para começar a fazer solicitações ReadChangeStream, você precisa conhecer as partições da tabela. Isso pode ser determinado com o método GenerateInitialChangeStreamPartitions. O exemplo a seguir mostra como
usar esse método para receber um fluxo de
ByteStringRanges
que representa cada partição na tabela. Cada ByteStringRange contém as chaves de início e fim de uma partição.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Processar alterações de cada partição
Em seguida, processe as alterações de cada partição usando o método ReadChangeStream. Este é um exemplo de como abrir um fluxo para uma partição, a partir da hora atual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery aceita os seguintes argumentos:
- Partição de fluxo (obrigatório): a partição que mostra o fluxo de alterações.
- Uma das seguintes opções:
- Horário de início: carimbo de data/hora de confirmação que começa a processar as alterações.
- Tokens de continuação: tokens que representam um ponto de retomada do fluxo.
- Horário de término (opcional): carimbo de data/hora de confirmação que para de processar as alterações quando atingido. Se você não fornecer um valor, a leitura do fluxo continuará.
- Duração do sinal de funcionamento (opcional): frequência das mensagens de sinal de funcionamento quando não há novas alterações (o padrão é cinco segundos).
Formato do registro do fluxo de alterações
Um registro do fluxo de alterações retornado é um dos três tipos de resposta:
ChangeStreamMutation: uma mensagem que representa um registro de alteração de dados.CloseStream: uma mensagem que indica que o cliente precisa parar a leitura do fluxo.- Status: indica o motivo para encerramento do fluxo. Uma destas:
OK: o horário de término da partição especificada foi atingido.OUT_OF_RANGE: a partição especificada não existe mais. Isso significa que divisões ou mesclagens ocorreram nessa partição. Uma nova solicitaçãoReadChangeStreamprecisará ser criada para cada nova partição.
NewPartitions: contém as informações de particionamento atualizadas sobre respostasOUT_OF_RANGE.ChangeStreamContinuationTokens: lista de tokens usados para retomar novas solicitaçõesReadChangeStreamdo mesmo ponto. Um porNewPartition.
- Status: indica o motivo para encerramento do fluxo. Uma destas:
Heartbeat: uma mensagem periódica com informações que podem ser usadas para verificar o estado do fluxo.EstimatedLowWatermark: estimativa da marca-d'água baixa da partição especificada.ContinuationToken: token para retomar o fluxo da partição especificada a partir do ponto atual.
Conteúdo do registro de alteração de dados
Para informações sobre registros de fluxo de alterações, consulte O que há em um registro de alteração de dados.
Processar alterações em partições
Quando as partições de uma tabela são alteradas, as solicitações ReadChangeStream retornam uma mensagem CloseStream com as informações necessárias para retomar o fluxo das partições novas.
Em uma divisão, ela conterá várias partições novas e um ContinuationToken correspondente para cada partição. Para retomar o fluxo das partições novas do mesmo ponto, faça uma nova solicitação ReadChangeStream para cada partição nova com o token correspondente.
Por exemplo, se você estiver mostrando o fluxo da partição [A,C) e ela for dividida em duas partições, [A,B) e [B,C), espere a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para retomar o fluxo de cada partição do mesmo ponto, envie as seguintes solicitações ReadChangeStreamQuery:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
Em uma mesclagem, para retomar do mesmo ponto, é necessário enviar uma nova solicitação ReadChangeStream que contém cada token das partições mescladas.
Por exemplo, se você estiver mostrando o fluxo de duas partições, [A,B) e [B,C), e elas forem mescladas na partição [A,C), espere a seguinte sequência de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para retomar o fluxo da partição [A, C) do mesmo ponto, envie uma ReadChangeStreamQuery como esta:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));