Leer un flujo de cambios con Java
La biblioteca de cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel para procesar registros de cambios de datos. Sin embargo, en la mayoría de los casos, te recomendamos que transfieras los cambios con Dataflow en lugar de usar los métodos descritos en esta página, ya que Dataflow gestiona las divisiones y las combinaciones de particiones por ti.
Antes de empezar
Antes de leer un flujo de cambios con Java, asegúrate de que conoces el resumen de los flujos de cambios. A continuación, completa los requisitos previos.
Configurar la autenticación
Para usar las Java muestras de esta página en un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, a continuación, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.
Instala Google Cloud CLI.
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Para obtener más información, consulta Set up authentication for a local development environment.
Para obtener información sobre cómo configurar la autenticación en un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.
Habilitar un flujo de cambios
Debes habilitar un flujo de cambios en una tabla para poder leerla. También puedes crear una tabla con un flujo de cambios habilitado.
Roles obligatorios
Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pide a tu administrador que te conceda el siguiente rol de gestión de identidades y accesos.
- Administrador de Bigtable
(
roles/bigtable.admin
) en la instancia de Bigtable que contiene la tabla de la que quieres transmitir los cambios
Añadir la biblioteca de cliente de Java como dependencia
Añade a tu archivo pom.xml
de Maven un código similar al siguiente. Sustituye VERSION
por la versión de la biblioteca de cliente que estés usando. La versión debe ser la 2.21.0 o una posterior.
<dependencies>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigtable</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Determinar las particiones de la tabla
Para empezar a hacer solicitudes ReadChangeStream
, debes conocer las particiones de tu tabla. Para determinarlo, puedes usar el método GenerateInitialChangeStreamPartitions
. En el siguiente ejemplo se muestra cómo usar este método para obtener un flujo de ByteStringRanges
que representa cada partición de la tabla. Cada ByteStringRange
contiene la clave de inicio y la clave de finalización de una partición.
ServerStream<ByteStringRange> partitionStream =
client.generateInitialChangeStreamPartitions("MyTable");
Procesar los cambios de cada partición
A continuación, puede procesar los cambios de cada partición con el método ReadChangeStream
. Este es un ejemplo de cómo abrir un flujo para una partición, empezando por la hora actual.
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("MyTable")
.streamPartition(partition)
.startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);
ReadChangeStreamQuery
acepta los siguientes argumentos:
- Partición de la secuencia (obligatorio): la partición de la que se van a transmitir los cambios.
- Una de las siguientes:
- Hora de inicio: marca de tiempo de la confirmación a partir de la que se empezarán a procesar los cambios.
- Tokens de continuación: tokens que representan una posición desde la que reanudar la transmisión
- Hora de finalización (opcional): marca de tiempo de confirmación para detener el procesamiento de los cambios cuando se alcance. Si no proporciona ningún valor, el flujo seguirá leyendo.
- Duración del latido (opcional): frecuencia de los mensajes de latido cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos).
Cambiar el formato de grabación de la emisión
Un registro de flujo de cambios devuelto es uno de los tres tipos de respuesta:
ChangeStreamMutation
: mensaje que representa un registro de cambios de datos.CloseStream
: mensaje que indica que el cliente debe dejar de leer de la secuencia.- Estado: indica el motivo por el que se ha cerrado el flujo. Una de las siguientes:
OK
: se ha alcanzado la hora de finalización de la particiónOUT_OF_RANGE
: la partición indicada ya no existe, lo que significa que se ha dividido o combinado. Se deberá crear una nueva solicitudReadChangeStream
para cada partición.
NewPartitions
: proporciona la información de partición actualizada en las respuestas deOUT_OF_RANGE
.ChangeStreamContinuationTokens
: lista de tokens usados para reanudar nuevas solicitudesReadChangeStream
desde la misma posición. Una porNewPartition
.
- Estado: indica el motivo por el que se ha cerrado el flujo. Una de las siguientes:
Heartbeat
: mensaje periódico con información que se puede usar para comprobar el estado del flujo.EstimatedLowWatermark
: estimación de la marca de agua mínima de la partición dada.ContinuationToken
: token para reanudar la transmisión de la partición indicada desde la posición actual.
Contenido de los registros de cambios de datos
Para obtener información sobre los registros de flujo de cambios, consulta Qué contiene un registro de cambio de datos.
Gestionar los cambios en las particiones
Cuando cambian las particiones de una tabla, las solicitudes ReadChangeStream
devuelven un mensaje CloseStream
con la información necesaria para reanudar la transmisión desde las nuevas particiones.
En el caso de una división, contendrá varias particiones nuevas y un ContinuationToken
correspondiente a cada partición. Para reanudar la transmisión de las nuevas particiones desde la misma posición, debes enviar una nueva solicitud ReadChangeStream
para cada partición con su token correspondiente.
Por ejemplo, si estás transmitiendo la partición [A,C)
y se divide en dos particiones, [A,B)
y [B,C)
, puedes esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
ChangeStreamContinuationTokens = List(foo, bar)
)
Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery
:
ReadChangeStreamQuery queryAB =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, B))
.continuationTokens(List.of(foo));
ReadChangeStreamQuery queryBC =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(B, C))
.continuationTokens(List.of(bar));
En el caso de una combinación, para reanudarla desde la misma partición, debes enviar una nueva solicitudReadChangeStream
que contenga cada token de las particiones combinadas.
Por ejemplo, si está transmitiendo dos particiones, [A,B)
y [B,C)
, y se combinan en la partición [A,C)
, puede esperar la siguiente secuencia de eventos:
ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(foo)
)
ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
Status = OUT_OF_RANGE,
NewPartitions = List(ByteStringRange(A, C)),
ChangeStreamContinuationTokens = List(bar)
)
Para reanudar la transmisión de la partición [A, C)
desde la misma posición, envía un ReadChangeStreamQuery
como el siguiente:
ReadChangeStreamQuery query =
ReadChangeStreamQuery.create("myTable")
.streamPartition(ByteStringRange(A, C))
.continuationTokens(List.of(foo, bar));