Lee un flujo de cambios con Java

La biblioteca cliente de Cloud Bigtable para Java proporciona métodos de bajo nivel para procesar registros de cambios de datos. Sin embargo, en la mayoría de los casos, te recomendamos que transmitas los cambios con Dataflow en lugar de usar los métodos que se describen en esta página, ya que Dataflow controla las divisiones de particiones y las uniones por ti.

Antes de comenzar

Antes de leer un flujo de cambios con Java, asegúrate de estar familiarizado con la descripción general de los flujos de cambios. Luego, completa los siguientes requisitos previos.

Configura la autenticación

Para usar las muestras de Java incluidas en esta página en un entorno de desarrollo local, instala e inicializa la gcloud CLI y, luego, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.

  1. Instala Google Cloud CLI.

  2. Si usas un proveedor de identidad (IdP) externo, primero debes acceder a la gcloud CLI con tu identidad federada.

  3. Si usas un shell local, crea credenciales de autenticación locales para tu cuenta de usuario:

    gcloud auth application-default login

    No es necesario que lo hagas si usas Cloud Shell.

    Si se muestra un error de autenticación y usas un proveedor de identidad (IdP) externo, confirma que accediste a la gcloud CLI con tu identidad federada.

Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

Para obtener información sobre la configuración de la autenticación para un entorno de producción, consulta Configura las credenciales predeterminadas de la aplicación para el código que se ejecuta en Google Cloud .

Habilita un flujo de cambios

Debes habilitar un flujo de cambios en una tabla antes de poder leerlo. También puedes crear una tabla nueva con un flujo de cambios habilitado.

Roles obligatorios

Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable, pídele a tu administrador que te otorgue el siguiente rol de IAM.

  • Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla desde la que planeas transmitir los cambios

Agrega la biblioteca cliente de Java como dependencia

Agrega código similar al siguiente a tu archivo pom.xml de Maven. Reemplaza VERSION por la versión de la biblioteca cliente que usas. La versión debe ser 2.21.0 o posterior.

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigtable</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Determina las particiones de la tabla

Para comenzar a realizar solicitudes ReadChangeStream, debes conocer las particiones de tu tabla. Esto se puede determinar con el método GenerateInitialChangeStreamPartitions. En el siguiente ejemplo, se muestra cómo usar este método para obtener un flujo de ByteStringRanges que representa cada partición de la tabla. Cada ByteStringRange contiene la clave de inicio y finalización de una partición.

ServerStream<ByteStringRange> partitionStream =
    client.generateInitialChangeStreamPartitions("MyTable");

Procesa los cambios de cada partición

Luego, puedes procesar los cambios de cada partición con el método ReadChangeStream. Este es un ejemplo de cómo abrir un flujo para una partición, a partir de la hora actual.

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("MyTable")
        .streamPartition(partition)
        .startTime(Instant.now());
ServerStream<ChangeStreamRecord> changeStream = client.readChangeStream(query);

ReadChangeStreamQuery acepta los siguientes argumentos:

  • Partición de transmisión (obligatorio): Es la partición desde la que se transmiten los cambios.
  • Uno de los siguientes:
    • Hora de inicio: Es la marca de tiempo de confirmación para comenzar a procesar los cambios.
    • Tokens de continuación: Son tokens que representan una posición para reanudar la transmisión.
  • Hora de finalización (opcional): Es la marca de tiempo de confirmación para detener el procesamiento de los cambios cuando se alcanza. Si no proporcionas un valor, la transmisión continúa leyendo.
  • Duración del latido (opcional): Es la frecuencia de los mensajes de latido cuando no hay cambios nuevos (el valor predeterminado es de cinco segundos).

Formato de registro de flujo de cambios

Un registro de flujo de cambios que se muestra es uno de los tres tipos de respuesta:

  • ChangeStreamMutation - Es un mensaje que representa un registro de cambio de datos.

  • CloseStream : Es un mensaje que indica que el cliente debe dejar de leer de la transmisión.

    • Estado: Indica el motivo del cierre de la transmisión. Uno de los siguientes:
      • OK - Se alcanzó la hora de finalización de la partición determinada.
      • OUT_OF_RANGE - La partición determinada ya no existe, lo que significa que se produjeron divisiones o uniones en esta partición. Se deberá crear una nueva solicitud ReadChangeStream para cada partición nueva.
    • NewPartitions - Proporciona la información de partición actualizada en las respuestas OUT_OF_RANGE.
    • ChangeStreamContinuationTokens - Es una lista de tokens que se usan para reanudar las solicitudes ReadChangeStream nuevas desde la misma posición. Uno por NewPartition.
  • Heartbeat - Es un mensaje periódico con información que se puede usar para marcar el estado de la transmisión.

    • EstimatedLowWatermark: Es una estimación de la marca de agua baja para la partición determinada.
    • ContinuationToken - Token para reanudar la transmisión de la partición determinada desde la posición actual.

Contenido del registro de cambio de datos

Para obtener información sobre los registros de flujo de cambios, consulta ¿Qué contiene un registro de cambio de datos ?.

Controla los cambios en las particiones

Cuando cambian las particiones de una tabla, las solicitudes ReadChangeStream muestran un mensaje CloseStream con la información necesaria para reanudar la transmisión desde las particiones nuevas.

En el caso de una división, esto contendrá varias particiones nuevas y un ContinuationToken correspondiente para cada partición. Para reanudar la transmisión de las particiones nuevas desde la misma posición, debes realizar una nueva solicitud ReadChangeStream para cada partición nueva con su token correspondiente.

Por ejemplo, si transmites la partición [A,C) y se divide en dos particiones, [A,B) y [B,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, B), ByteStringRange(B, C))
    ChangeStreamContinuationTokens = List(foo, bar)
)

Para reanudar la transmisión de cada partición desde la misma posición, envía las siguientes solicitudes ReadChangeStreamQuery:

ReadChangeStreamQuery queryAB =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, B))
        .continuationTokens(List.of(foo));

ReadChangeStreamQuery queryBC =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(B, C))
        .continuationTokens(List.of(bar));

En el caso de una unión, para reanudar desde la misma partición, debes enviar una nueva solicitud ReadChangeStream que contenga cada token de las particiones unidas.

Por ejemplo, si transmites dos particiones, [A,B) y [B,C), y se unen en la partición [A,C), puedes esperar la siguiente secuencia de eventos:

ReadChangeStream(streamPartition = ByteStringRange(A, B)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(foo)
)

ReadChangeStream(streamPartition = ByteStringRange(B, C)) receives:
CloseStream(
    Status = OUT_OF_RANGE,
    NewPartitions = List(ByteStringRange(A, C)),
    ChangeStreamContinuationTokens = List(bar)
)

Para reanudar la transmisión de la partición [A, C) desde la misma posición, envía una ReadChangeStreamQuery como la siguiente:

ReadChangeStreamQuery query =
    ReadChangeStreamQuery.create("myTable")
        .streamPartition(ByteStringRange(A, C))
        .continuationTokens(List.of(foo, bar));

¿Qué sigue?