En este tutorial se muestra cómo implementar una canalización de datos en Dataflow para obtener un flujo en tiempo real de los cambios de una base de datos procedentes del flujo de cambios de una tabla de Bigtable. La salida del flujo de procesamiento se escribe en una serie de archivos de Cloud Storage.
Se proporciona un conjunto de datos de ejemplo para una aplicación de escucha de música. En este tutorial, se monitorizan las canciones que se escuchan y, a continuación, se clasifican las cinco más populares durante un periodo determinado.
Este tutorial está dirigido a usuarios técnicos que sepan escribir código y desplegar flujos de procesamiento de datos en Google Cloud.
Preparar el entorno
Obtén el código
Clona el repositorio que contiene el código de ejemplo. Si ya has descargado este repositorio, haz un pull para obtener la versión más reciente.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Crear un segmento
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
with a bucket name
that meets the bucket naming requirements.
Crear una instancia de Bigtable
Puedes usar una instancia que ya tengas para este tutorial o crear una con las configuraciones predeterminadas en una región cercana.
Crear una tabla
La aplicación de ejemplo monitoriza las canciones que escuchan los usuarios y almacena los eventos de escucha en Bigtable. Crea una tabla con un flujo de cambios habilitado que tenga una familia de columnas (cf) y una columna (canción), y que use IDs de usuario como claves de fila.
Crea la tabla.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Haz los cambios siguientes:
- PROJECT_ID: el ID del proyecto que estás usando
- BIGTABLE_INSTANCE_ID: el ID de la instancia que contendrá la nueva tabla
Iniciar el flujo de procesamiento
Esta canalización transforma el flujo de cambios de la siguiente manera:
- Lee el flujo de cambios
- Obtiene el nombre de la canción.
- Agrupa los eventos de escucha de canciones en ventanas de N segundos.
- Cuenta las cinco canciones más escuchadas
- Muestra los resultados
Ejecuta el flujo de procesamiento.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Sustituye BIGTABLE_REGION por el ID de la región en la que se encuentra tu instancia de Bigtable, como us-east5
.
Información sobre el flujo de trabajo
Los siguientes fragmentos de código de la canalización pueden ayudarte a entender el código que estás ejecutando.
Leer el flujo de cambios
El código de este ejemplo configura el flujo de origen con los parámetros de la instancia y la tabla de Bigtable específicas.
Obtener el nombre de la canción
Cuando se escucha una canción, el nombre de la canción se escribe en la familia de columnas cf
y en el calificador de columna song
, por lo que el código extrae el valor de la mutación del flujo de cambios y lo envía al siguiente paso de la canalización.
Contar las cinco canciones más escuchadas
Puedes usar las funciones integradas de Beam Count
y Top.of
para obtener las cinco canciones más populares de la ventana actual.
Generar los resultados
Esta canalización escribe los resultados en la salida estándar, así como en archivos. En el caso de los archivos, las escrituras se agrupan en grupos de 10 elementos o en segmentos de un minuto.
Ver el flujo de procesamiento
En la Google Cloud consola, ve a la página Dataflow.
Haz clic en el trabajo cuyo nombre empiece por song-rank.
En la parte inferior de la pantalla, haz clic en Mostrar para abrir el panel de registros.
Haz clic en Registros de trabajador para monitorizar los registros de salida del flujo de cambios.
Operaciones de escritura de flujos
Usa la
cbt
CLI
para escribir el número de escuchas de canciones de varios usuarios en la tabla song-rank
. Está diseñada para escribir durante unos minutos y simular que se escuchan canciones en streaming a lo largo del tiempo.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Ver el resultado
Lee el resultado en Cloud Storage para ver las canciones más populares.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Ejemplo:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]