Procesar un flujo de cambios de Bigtable

En este tutorial se muestra cómo implementar una canalización de datos en Dataflow para obtener un flujo en tiempo real de los cambios de una base de datos procedentes del flujo de cambios de una tabla de Bigtable. La salida del flujo de procesamiento se escribe en una serie de archivos de Cloud Storage.

Se proporciona un conjunto de datos de ejemplo para una aplicación de escucha de música. En este tutorial, se monitorizan las canciones que se escuchan y, a continuación, se clasifican las cinco más populares durante un periodo determinado.

Este tutorial está dirigido a usuarios técnicos que sepan escribir código y desplegar flujos de procesamiento de datos en Google Cloud.

Objetivos

En este tutorial te explicamos cómo hacer lo siguiente:

  • Crea una tabla de Bigtable con un flujo de cambios habilitado.
  • Despliega una canalización en Dataflow que transforme y genere el flujo de cambios.
  • Consulta los resultados de tu canalización de datos.

Costes

En este documento, se utilizan los siguientes componentes facturables de Google Cloud:

Para generar una estimación de costes basada en el uso previsto, utiliza la calculadora de precios.

Los usuarios nuevos pueden disfrutar de una prueba gratuita. Google Cloud

Cuando termines las tareas que se describen en este documento, puedes evitar que se te siga facturando eliminando los recursos que hayas creado. Para obtener más información, consulta la sección Limpiar.

Antes de empezar

    Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.

    Instala Google Cloud CLI. Después de la instalación, inicializa la CLI de Google Cloud ejecutando el siguiente comando:

    gcloud init

    Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Instala Google Cloud CLI. Después de la instalación, inicializa la CLI de Google Cloud ejecutando el siguiente comando:

    gcloud init

    Si utilizas un proveedor de identidades (IdP) externo, primero debes iniciar sesión en la CLI de gcloud con tu identidad federada.

    Create or select a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator role (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.
    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

    Verify that billing is enabled for your Google Cloud project.

    Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Actualiza e instala la CLI de cbt .
    gcloud components update
    gcloud components install cbt
  2. Preparar el entorno

    Obtén el código

    Clona el repositorio que contiene el código de ejemplo. Si ya has descargado este repositorio, haz un pull para obtener la versión más reciente.

    git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
    cd java-docs-samples/bigtable/beam/change-streams
    

    Crear un segmento

  3. Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Crear una instancia de Bigtable

    Puedes usar una instancia que ya tengas para este tutorial o crear una con las configuraciones predeterminadas en una región cercana.

    Crear una tabla

    La aplicación de ejemplo monitoriza las canciones que escuchan los usuarios y almacena los eventos de escucha en Bigtable. Crea una tabla con un flujo de cambios habilitado que tenga una familia de columnas (cf) y una columna (canción), y que use IDs de usuario como claves de fila.

    Crea la tabla.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto que estás usando
    • BIGTABLE_INSTANCE_ID: el ID de la instancia que contendrá la nueva tabla

    Iniciar el flujo de procesamiento

    Esta canalización transforma el flujo de cambios de la siguiente manera:

    1. Lee el flujo de cambios
    2. Obtiene el nombre de la canción.
    3. Agrupa los eventos de escucha de canciones en ventanas de N segundos.
    4. Cuenta las cinco canciones más escuchadas
    5. Muestra los resultados

    Ejecuta el flujo de procesamiento.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Sustituye BIGTABLE_REGION por el ID de la región en la que se encuentra tu instancia de Bigtable, como us-east5.

    Información sobre el flujo de trabajo

    Los siguientes fragmentos de código de la canalización pueden ayudarte a entender el código que estás ejecutando.

    Leer el flujo de cambios

    El código de este ejemplo configura el flujo de origen con los parámetros de la instancia y la tabla de Bigtable específicas.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obtener el nombre de la canción

    Cuando se escucha una canción, el nombre de la canción se escribe en la familia de columnas cf y en el calificador de columna song, por lo que el código extrae el valor de la mutación del flujo de cambios y lo envía al siguiente paso de la canalización.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Contar las cinco canciones más escuchadas

    Puedes usar las funciones integradas de Beam Count y Top.of para obtener las cinco canciones más populares de la ventana actual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Generar los resultados

    Esta canalización escribe los resultados en la salida estándar, así como en archivos. En el caso de los archivos, las escrituras se agrupan en grupos de 10 elementos o segmentos de un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Ver el flujo de procesamiento

    1. En la Google Cloud consola, ve a la página Dataflow.

      Ir a Dataflow

    2. Haz clic en el trabajo cuyo nombre empiece por song-rank.

    3. En la parte inferior de la pantalla, haz clic en Mostrar para abrir el panel de registros.

    4. Haz clic en Registros de trabajador para monitorizar los registros de salida del flujo de cambios.

    Operaciones de escritura de flujos

    Usa la CLI cbt para escribir el número de escuchas de canciones de varios usuarios en la tabla song-rank. Está diseñada para escribir durante unos minutos y simular que se escuchan canciones en streaming a lo largo del tiempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Ver el resultado

    Lee el resultado en Cloud Storage para ver las canciones más populares.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Ejemplo:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    
  4. Limpieza

    Para evitar que los recursos utilizados en este tutorial se cobren en tu cuenta de Google Cloud, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

    Eliminar el proyecto

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    Eliminar recursos concretos

    1. Elimina el contenedor y los archivos.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Inhabilita el flujo de cambios en la tabla.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Elimina la tabla song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Detén la canalización de flujo de cambios.

      1. Lista los trabajos para obtener el ID de trabajo.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Cancela la tarea.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Sustituye JOB_ID por el ID de trabajo que se muestra después del comando anterior.

    Siguientes pasos