Transmitir cambios con Dataflow
El conector de Bigtable para Beam te permite usar Dataflow para leer registros de cambios de datos de Bigtable sin tener que monitorizar ni procesar los cambios de partición en tu código, ya que el conector se encarga de esa lógica.
En este documento se describe cómo configurar y usar el conector de Bigtable Beam para leer un flujo de cambios mediante una canalización de Dataflow. Antes de leer este documento, debes consultar la descripción general de los flujos de cambios y familiarizarte con Dataflow.
Alternativas a la creación de tu propia canalización
Si no quieres crear tu propia canalización de Dataflow, puedes usar una de las siguientes opciones.
Puede usar una plantilla de Dataflow proporcionada por Google.
También puedes usar los ejemplos de código del tutorial o la guía de inicio rápido de Bigtable como punto de partida para tu código.
Asegúrate de que el código que generes use la versión 26.14.0 de google cloud libraries-bom
o una posterior.
Detalles del conector
El método del conector de Bigtable Beam, BigtableIO.readChangeStream
, te permite leer un flujo de registros de cambios de datos (ChangeStreamMutation
) que puedes procesar. El conector de Bigtable Beam es un componente del repositorio de GitHub de Apache Beam. Para ver una descripción del código del conector, consulta los comentarios en BigtableIO.java
.
Debes usar el conector con la versión 2.48.0 de Beam o una posterior. Consulta la compatibilidad con el tiempo de ejecución de Apache Beam para asegurarte de que utilizas una versión compatible de Java. Después, puedes implementar una canalización que use el conector en Dataflow, que se encarga del aprovisionamiento y la gestión de los recursos, y que ayuda a mejorar la escalabilidad y la fiabilidad del procesamiento de datos de streaming.
Para obtener más información sobre el modelo de programación de Apache Beam, consulta la documentación de Beam.
Agrupar datos sin horas de evento
Los registros de cambios de datos transmitidos mediante el conector Beam de Bigtable no son compatibles con las funciones de Dataflow que dependen de las horas de los eventos.
Como se explica en Replicación y marcas de agua, es posible que una marca de agua baja no avance si la replicación de la partición no se ha puesto al día con el resto de la instancia. Cuando una marca de agua baja deja de avanzar, puede provocar que el flujo de cambios se detenga.
Para evitar que el flujo se detenga, el conector de Bigtable Beam genera todos los datos con una marca de tiempo de salida de cero. La marca de tiempo cero hace que Dataflow considere que todos los registros de cambios de datos son datos tardíos. Por lo tanto, las funciones de Dataflow que dependen de las horas de los eventos no son compatibles con los flujos de cambios de Bigtable. En concreto, no puedes usar funciones de ventana, activadores de tiempo de evento ni temporizadores de tiempo de evento.
En su lugar, puedes usar GlobalWindows con activadores de tiempo que no sean de eventos para agrupar estos datos tardíos en paneles, como se muestra en el ejemplo del tutorial. Para obtener más información sobre los activadores y los paneles, consulta la sección Activadores de la guía de programación de Beam.
Autoescalado
El conector admite el escalado automático de Dataflow, que está habilitado de forma predeterminada cuando se usa Runner v2 (obligatorio). El algoritmo de escalado automático de Dataflow tiene en cuenta el backlog estimado del flujo de cambios, que se puede monitorizar en la página Monitorización de Dataflow de la sección Backlog
. Usa la marca --maxNumWorkers
al implementar una tarea para limitar el número de trabajadores.
Para escalar tu flujo de procesamiento manualmente en lugar de usar el autoescalado, consulta Escalar un flujo de procesamiento de datos manualmente.
Limitaciones
Ten en cuenta las siguientes limitaciones antes de usar el conector Bigtable Beam con Dataflow.
Ejecutor de Dataflow V2
El conector solo se puede ejecutar con Dataflow Runner v2.
Para habilitar esta opción, especifica --experiments=use_runner_v2
en los argumentos de la línea de comandos. Si ejecutas el flujo de trabajo con Runner v1, se producirá un error con la siguiente excepción:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
Capturas
El conector no admite instantáneas de flujo de datos.
Duplicados
El conector de Beam de Bigtable transmite los cambios de cada clave de fila y de cada clúster en orden de marca de tiempo de confirmación, pero, como a veces se reinicia desde momentos anteriores de la secuencia, puede producir duplicados.
Reinicios de la canalización
Si una canalización de Dataflow se ha detenido durante mucho tiempo, los registros de cambios de datos pueden quedar fuera del límite de conservación. Cuando se reanuda la canalización, Bigtable la rechaza para que puedas iniciar una nueva canalización con una nueva hora de inicio de la solicitud que esté dentro del periodo de conservación. Bigtable hace esto en lugar de adelantar en silencio la hora de la solicitud de la canalización original para evitar que se eliminen por error registros de cambios de datos con marcas de tiempo que queden fuera del periodo de conservación especificado.
Antes de empezar
Antes de usar el conector, completa los siguientes requisitos previos.
Configurar la autenticación
Para usar las Java muestras de esta página en un entorno de desarrollo local, instala e inicializa la CLI de gcloud y, a continuación, configura las credenciales predeterminadas de la aplicación con tus credenciales de usuario.
Instala Google Cloud CLI.
Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Para obtener más información, consulta Set up authentication for a local development environment.
Para obtener información sobre cómo configurar la autenticación en un entorno de producción, consulta Set up Application Default Credentials for code running on Google Cloud.
Habilitar un flujo de cambios
Debes habilitar un flujo de cambios en una tabla para poder leerla. También puedes crear una tabla con los flujos de cambios habilitados.
Cambiar la tabla de metadatos de un flujo
Cuando transmite cambios con Dataflow, el conector Beam de Bigtable crea una tabla de metadatos llamada __change_stream_md_table
de forma predeterminada. La tabla de metadatos de la secuencia de cambios gestiona el estado operativo del conector y almacena metadatos sobre los registros de cambios de datos.
De forma predeterminada, el conector crea la tabla en la misma instancia que la tabla que se está transmitiendo. Para que la tabla funcione correctamente, el perfil de aplicación de la tabla de metadatos debe usar el enrutamiento de un solo clúster y tener habilitadas las transacciones de una sola fila.
Para obtener más información sobre los cambios de Bigtable con el conector Bigtable Beam, consulta la documentación de BigtableIO.
Roles obligatorios
Para obtener los permisos que necesitas para leer un flujo de cambios de Bigtable con Dataflow, pide a tu administrador que te conceda los siguientes roles de gestión de identidades y accesos.
Para leer los cambios de Bigtable, necesitas este rol:
- Administrador de Bigtable (roles/bigtable.admin) en la instancia de Bigtable que contiene la tabla de la que quieres transmitir los cambios
Para ejecutar el trabajo de Dataflow, necesitas estos roles:
- Desarrollador de Dataflow (
roles/dataflow.developer
) en el proyecto que contiene tus recursos de Cloud - Trabajador de Dataflow (roles/dataflow.worker) en el proyecto que contiene tus recursos de Cloud
- Administrador de objetos de Storage (roles/storage.objectAdmin) en los segmentos de Cloud Storage que quieras usar
Para obtener más información sobre cómo conceder roles, consulta el artículo sobre cómo gestionar el acceso.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Añadir el conector de Bigtable Beam como dependencia
Añade a tu archivo pom.xml de Maven un código similar a la siguiente dependencia. La versión debe ser 2.48.0 o posterior.
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
Leer el flujo de cambios
Para crear una canalización de Dataflow que lea los registros de cambios de datos, configura el conector y, a continuación, añade transformaciones y receptores. Después, usa el conector para leer objetos ChangeStreamMutation
en una canalización de Beam.
Los ejemplos de código de esta sección, escritos en Java, muestran cómo crear una canalización y usarla para convertir pares clave-valor en una cadena. Cada par consta de una clave de fila y un objeto ChangeStreamMutation
. El proceso convierte las entradas de cada objeto en una cadena separada por comas.
Crear el flujo de procesamiento
En este ejemplo de código Java se muestra cómo crear la canalización:
Procesar los registros de cambios de datos
En este ejemplo se muestra cómo recorrer todas las entradas de un registro de cambios de datos de una fila y llamar a un método de conversión a cadena en función del tipo de entrada.
Para ver una lista de los tipos de entradas que puede contener un registro de cambios de datos, consulta Qué contiene un registro de cambios de datos.
En este ejemplo, se convierte una entrada write:
En este ejemplo, se convierte una entrada de eliminación de celdas:
En este ejemplo, se convierte una entrada de eliminación de una familia de columnas:
Monitorizar
Los siguientes recursos de la consola de Google Cloud GCPGoogle Cloud te permiten monitorizar tus recursos mientras ejecutas una canalización de Dataflow para leer un flujo de cambios de Bigtable:
En concreto, comprueba las siguientes métricas:
- En la página de estadísticas del sistema de Bigtable, consulta las siguientes métricas:
- Uso de la CPU por parte de los datos de los flujos de cambios en la métrica
cpu_load_by_app_profile_by_method_by_table
. Muestra el impacto del flujo de cambios en el uso de la CPU de tu clúster. - Cambiar el uso del almacenamiento de la secuencia de cambios (bytes)
(
change_stream_log_used_bytes
).
- Uso de la CPU por parte de los datos de los flujos de cambios en la métrica
En la página de monitorización de Dataflow, comprueba la actualización de los datos. Esta métrica muestra la diferencia entre la hora actual y la marca de agua, que es de aproximadamente dos minutos, con picos ocasionales que duran uno o dos minutos más. La actualización de los datos no indica si los registros de cambios de datos se están procesando lentamente. Para asegurar el buen estado y el rendimiento continuos de tus aplicaciones críticas, monitoriza la métrica de actualización de datos de Dataflow y lleva a cabo las siguientes acciones:
- Si la métrica de actualización de los datos es sistemáticamente superior al umbral, es posible que tu canalización no tenga suficientes recursos. Te recomendamos que añadas más trabajadores de Dataflow.
- Si los trabajadores de Dataflow están bien aprovisionados, pero la actualización de los datos ha aumentado o es constantemente alta, ponte en contacto con el equipo de Asistencia deGoogle Cloud .
La métrica
processing_delay_from_commit_timestamp_MEAN
de Dataflow puede indicar el tiempo medio de procesamiento de los registros de cambios de datos durante el tiempo de vida de la tarea.
La métrica server/latencies
de Bigtable no es útil cuando monitorizas una canalización de Dataflow que está leyendo un flujo de cambios de Bigtable, ya que refleja la duración de la solicitud de streaming, no la latencia de procesamiento de los registros de cambios de datos. Una latencia alta en un flujo de cambios no significa que las solicitudes se estén procesando lentamente, sino que la conexión ha estado abierta durante ese tiempo.
Siguientes pasos
- Consulta cómo escribir datos de Dataflow en Cloud Storage.
- Consulta la lista completa de métricas de monitorización que proporciona Bigtable.
- Usa la monitorización para consultar las métricas de Dataflow.