Leer un flujo de cambios con Java

La biblioteca de cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel para procesar registros de cambios de datos. Sin embargo, en la mayoría de los casos, te recomendamos que transfieras los cambios con Dataflow en lugar de usar los métodos descritos en esta página, ya que Dataflow gestiona las divisiones y las combinaciones de particiones por ti.

Antes de empezar

Antes de leer un flujo de cambios con Java, asegúrate de que conoces el resumen de los flujos de cambios. A continuación, completa los requisitos previos.

Configurar la autenticación

Para usar las Java muestras de esta página en un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, a continuación, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

    Instala Google Cloud CLI.

    Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Para obtener más información, consulta Set up authentication for a local development environment.

Para obtener información sobre cómo configurar la autenticación en un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.

Habilitar un flujo de cambios

Debes habilitar un flujo de cambios en una tabla para poder leerla. También puedes crear una tabla con un flujo de cambios habilitado.

Roles obligatorios

Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pide a tu administrador que te conceda el siguiente rol de gestión de identidades y accesos.

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla de la que quieres transmitir los cambios

Añadir la biblioteca de cliente de Java como dependencia

Añade a tu archivo pom.xml de Maven un código similar al siguiente. Sustituye VERSION por la versión de la biblioteca de cliente que estés usando. La versión debe ser la 2.21.0 o una posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determinar las particiones de la tabla

Para empezar a hacer solicitudes ReadChangeStream, debes conocer las particiones de tu tabla. Para determinarlo, puedes usar el método GenerateInitialChangeStreamPartitions. En el siguiente ejemplo se muestra cómo usar este método para obtener un flujo de ByteStringRanges que representa cada partición de la tabla. Cada ByteStringRange contiene la clave de inicio y la clave de finalización de una partición.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Procesar los cambios de cada partición

A continuación, puede procesar los cambios de cada partición con el método ReadChangeStream. Este es un ejemplo de cómo abrir un flujo para una partición, empezando por la hora actual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery acepta los siguientes argumentos:

  • Partición de la secuencia (obligatorio): la partición de la que se van a transmitir los cambios.
  • Una de las siguientes:
    • Hora de inicio: marca de tiempo de la confirmación a partir de la que se empezarán a procesar los cambios.
    • Tokens de continuación: tokens que representan una posición desde la que reanudar la transmisión
  • Hora de finalización (opcional): marca de tiempo de confirmación para detener el procesamiento de los cambios cuando se alcance. Si no proporciona ningún valor, el flujo seguirá leyendo.
  • Duración del latido (opcional): frecuencia de los mensajes de latido cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos).

Cambiar el formato de grabación de la emisión

Un registro de flujo de cambios devuelto es uno de los tres tipos de respuesta:

  • ChangeStreamMutation: mensaje que representa un registro de cambios de datos.

  • CloseStream: mensaje que indica que el cliente debe dejar de leer de la secuencia.

    • Estado: indica el motivo por el que se ha cerrado el flujo. Una de las siguientes:
      • OK: se ha alcanzado la hora de finalización de la partición
      • OUT_OF_RANGE: la partición indicada ya no existe, lo que significa que se ha dividido o combinado. Se deberá crear una nueva solicitud ReadChangeStream para cada partición.
    • NewPartitions: proporciona la información de partición actualizada en las respuestas de OUT_OF_RANGE.
    • ChangeStreamContinuationTokens: lista de tokens usados para reanudar nuevas solicitudes ReadChangeStream desde la misma posición. Una por NewPartition.
  • Heartbeat: mensaje periódico con información que se puede usar para comprobar el estado del flujo.

    • EstimatedLowWatermark: estimación de la marca de agua mínima de la partición dada.
    • ContinuationToken: token para reanudar la transmisión de la partición indicada desde la posición actual.

Contenido de los registros de cambios de datos

Para obtener información sobre los registros de flujo de cambios, consulta Qué contiene un registro de cambio de datos.

Gestionar los cambios en las particiones

Cuando cambian las particiones de una tabla, las solicitudes ReadChangeStream devuelven un mensaje CloseStream con la información necesaria para reanudar la transmisión desde las nuevas particiones.

En el caso de una división, contendrá varias particiones nuevas y un ContinuationToken correspondiente a cada partición. Para reanudar la transmisión de las nuevas particiones desde la misma posición, debes enviar una nueva solicitud ReadChangeStream para cada partición con su token correspondiente.

Por ejemplo, si estás transmitiendo la partición [A,C) y se divide en dos particiones, [A,B) y [B,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

En el caso de una combinación, para reanudarla desde la misma partición, debes enviar una nueva solicitudReadChangeStream que contenga cada token de las particiones combinadas.

Por ejemplo, si está transmitiendo dos particiones, [A,B) y [B,C), y se combinan en la partición [A,C), puede esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para reanudar la transmisión de la partición [A, C) desde la misma posición, envía un ReadChangeStreamQuery como el siguiente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

Siguientes pasos