En este instructivo, se muestra cómo implementar una canalización de datos en Dataflow para obtener una transmisión en tiempo real de los cambios de la base de datos provenientes del flujo de cambios de una tabla de Bigtable. El resultado de la canalización se escribe en una serie de archivos en Cloud Storage.
Se proporciona un conjunto de datos de ejemplo para una aplicación de escucha de música. En este instructivo, se hace un seguimiento de las canciones que se escuchan y, luego, se clasifican las cinco mejores durante un período.
Este instructivo está dirigido a usuarios técnicos que estén familiarizados con la escritura de código y la implementación de canalizaciones de datos en Google Cloud.
Objetivos
En este instructivo, se muestra cómo realizar lo siguiente:
- Crea una tabla de Bigtable con un flujo de cambios habilitado.
- Implementa una canalización en Dataflow que transforme y genere el flujo de cambios.
- Visualiza los resultados de tu canalización de datos.
Costos
En este documento, usarás los siguientes componentes facturables de Google Cloud:
Para generar una estimación de costos en función del uso previsto,
usa la calculadora de precios.
Cuando completes las tareas que se describen en este documento, podrás borrar los recursos que creaste para evitar que se te siga facturando. Para obtener más información, consulta Realiza una limpieza.
Antes de comenzar
- Seleccionar un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de creador de proyectos
(
roles/resourcemanager.projectCreator), que contiene elresourcemanager.projects.createpermiso. Obtén información para otorgar roles. -
Crea un proyecto de: Google Cloud
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el Google Cloud proyecto de que estás creando. -
Selecciona el Google Cloud proyecto de que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto de. - Seleccionar un proyecto: Para seleccionar un proyecto, no se requiere un rol de IAM específico. Puedes seleccionar cualquier proyecto en el que se te haya otorgado un rol.
-
Crear un proyecto: Para crear un proyecto, necesitas el rol de creador de proyectos
(
roles/resourcemanager.projectCreator), que contiene elresourcemanager.projects.createpermiso. Obtén información para otorgar roles. -
Crea un proyecto de: Google Cloud
gcloud projects create PROJECT_ID
Reemplaza
PROJECT_IDpor un nombre para el Google Cloud proyecto de que estás creando. -
Selecciona el Google Cloud proyecto de que creaste:
gcloud config set project PROJECT_ID
Reemplaza
PROJECT_IDpor el nombre de tu Google Cloud proyecto de. - Actualiza e instala la
cbtCLI .gcloud components update gcloud components install cbt
Instala la Google Cloud CLI. Después de la instalación, inicializa Google Cloud CLI con el siguiente comando:
gcloud initSi usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
Verifica que la facturación esté habilitada para tu Google Cloud proyecto.
Habilita las APIs de Dataflow, Cloud Bigtable, API de Cloud Bigtable Admin y Cloud Storage:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el
serviceusage.services.enable permiso. Obtén información para otorgar
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Instala la Google Cloud CLI. Después de la instalación, inicializa Google Cloud CLI con el siguiente comando:
gcloud initSi usas un proveedor de identidad externo (IdP), primero debes acceder a la gcloud CLI con tu identidad federada.
Crea o selecciona un Google Cloud proyecto.
Roles necesarios para seleccionar o crear un proyecto
Verifica que la facturación esté habilitada para tu Google Cloud proyecto.
Habilita las APIs de Dataflow, Cloud Bigtable, API de Cloud Bigtable Admin y Cloud Storage:
Roles necesarios para habilitar las APIs
Para habilitar las APIs, necesitas el rol de IAM de administrador de Service Usage (roles/serviceusage.serviceUsageAdmin), que contiene el
serviceusage.services.enable permiso. Obtén información para otorgar
roles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Prepare el entorno
Obtén el código
Clona el repositorio que contiene el código de muestra. Si ya descargaste este repositorio, haz un pull para obtener la versión más reciente.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Crea un bucket
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME por un nombre de bucket
que cumpla con los requisitos de nombre del bucket.
Crear una instancia de Bigtable.
Puedes usar una instancia existente para este instructivo o crear una instancia con las configuraciones predeterminadas en una región cercana.
Crea una tabla
La aplicación de ejemplo hace un seguimiento de las canciones que escuchan los usuarios y almacena los eventos de escucha en Bigtable. Crea una tabla con un flujo de cambios habilitado que tenga una familia de columnas (cf) y una columna (song) y que use IDs de usuario para las claves de fila.
Crea la tabla.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID del proyecto que estás usando.
- BIGTABLE_INSTANCE_ID: Es el ID de la instancia que contendrá la tabla nueva.
Comienza la canalización
Esta canalización transforma el flujo de cambios de la siguiente manera:
- Lee el flujo de cambios.
- Obtiene el nombre de la canción.
- Agrupa los eventos de escucha de canciones en ventanas de N segundos.
- Cuenta las cinco canciones principales.
- Genera los resultados.
Ejecutar la canalización
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Reemplaza BIGTABLE_REGION por el ID de la región en la que se encuentra tu instancia de Bigtable, como us-east5.
Comprende la canalización
Los siguientes fragmentos de código de la canalización pueden ayudarte a comprender el código que estás ejecutando.
Lectura del flujo de cambios
El código de este ejemplo configura el flujo de origen con los parámetros de la instancia y la tabla de Bigtable específicas.
Obtén el nombre de la canción
Cuando se escucha una canción, el nombre de la canción se escribe en la familia de columnas cf y el calificador de columna song, por lo que el código extrae el valor de la mutación del flujo de cambios y lo envía al siguiente paso de la canalización.
Cuenta las cinco canciones principales
Puedes usar las funciones integradas de Beam Count y Top.of para obtener las cinco canciones principales en la ventana actual.
Genera los resultados
Esta canalización escribe los resultados en la salida estándar y en los archivos. Para los archivos, muestra las escrituras en ventanas en grupos de 10 elementos o segmentos de un minuto.
Visualiza la canalización
En la Google Cloud consola de, ve a la página Dataflow.
Haz clic en el trabajo con un nombre que comience con song-rank.
En la parte inferior de la pantalla, haz clic en Mostrar para abrir el panel de registros.
Haz clic en Registros de trabajadores para supervisar los registros de salida del flujo de cambios.
Operaciones de escritura por transmisión
Usa la CLI de
cbt para escribir una cantidad de escuchas de canciones para varios usuarios en
la tabla song-rank. Esto está diseñado para escribir durante unos minutos para simular la transmisión de escuchas de canciones a lo largo del tiempo.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Revise el resultado.
Lee el resultado en Cloud Storage para ver las canciones más populares.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Resultado de ejemplo:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
Borra un Google Cloud proyecto de:
gcloud projects delete PROJECT_ID
Borra los recursos individuales
Borra el bucket y los archivos.
gcloud storage rm --recursive gs://BUCKET_NAME/Inhabilita el flujo de cambios en la tabla.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-periodBorra la tabla
song-rank.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rankDetén la canalización de flujo de cambios.
Enumera los trabajos para obtener el ID del trabajo.
gcloud dataflow jobs list --region=BIGTABLE_REGIONCancela el trabajo.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGIONReemplaza JOB_ID por el ID de trabajo que se muestra después del comando anterior.