Procesa un flujo de cambios de Bigtable

En este instructivo, se muestra cómo implementar una canalización de datos en Dataflow para un flujo en tiempo real de cambios en la base de datos provenientes del flujo de cambios de una tabla de Bigtable. El resultado de la canalización se escribe en una serie de archivos en Cloud Storage.

Se proporciona un conjunto de datos de ejemplo para una aplicación de música. En este instructivo, harás un seguimiento de las canciones que se escuchan y, luego, clasificarás las cinco mejores durante un período.

Este instructivo está dirigido a usuarios técnicos que saben escribir código y desplegar canalizaciones de datos en Google Cloud.

Prepare el entorno

Obtén el código

Clona el repositorio que contiene el código de muestra. Si ya descargaste este repositorio, haz una extracción para obtener la versión más reciente.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Crea un bucket

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Crear una instancia de Bigtable.

    Puedes usar una instancia existente para este instructivo o crear una instancia con la configuración predeterminada en una región cercana a ti.

    Crea una tabla

    La aplicación de ejemplo hace un seguimiento de las canciones que escuchan los usuarios y almacena los eventos de escucha en Bigtable. Crea una tabla con un flujo de cambios habilitado que tenga una familia de columnas (cf) y una columna (song), y que use IDs de usuario para las claves de fila.

    Crea la tabla.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Reemplaza lo siguiente:

    • PROJECT_ID: Es el ID del proyecto que usas.
    • BIGTABLE_INSTANCE_ID: ID de la instancia que contendrá la tabla nueva

    Comienza la canalización

    Esta canalización transforma el flujo de cambios de la siguiente manera:

    1. Lee el flujo de cambios
    2. Obtiene el nombre de la canción.
    3. Agrupa los eventos de escucha de canciones en ventanas de N segundos.
    4. Cuenta las cinco canciones principales
    5. Genera los resultados

    Ejecutar la canalización

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Reemplaza BIGTABLE_REGION por el ID de la región en la que se encuentra tu instancia de Bigtable, como us-east5.

    Información sobre la canalización

    Los siguientes fragmentos de código de la canalización pueden ayudarte a comprender el código que ejecutas.

    Lectura del flujo de cambios

    El código de este ejemplo configura la transmisión de origen con los parámetros de la instancia y la tabla específicas de Bigtable.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Cómo obtener el nombre de la canción

    Cuando se escucha una canción, el nombre de la canción se escribe en la familia de columnas cf y el calificador de columna song, por lo que el código extrae el valor de la mutación del flujo de cambios y lo genera en el siguiente paso de la canalización.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Cómo contar las cinco canciones principales

    Puedes usar las funciones integradas de Beam Count y Top.of para obtener las cinco canciones más populares en la ventana actual.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Cómo generar los resultados

    Esta canalización escribe los resultados en la salida estándar y en archivos. En el caso de los archivos, agrupa las escrituras en grupos de 10 elementos o segmentos de un minuto.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Visualiza la canalización

    1. En la consola de Google Cloud , ve a la página Dataflow.

      Ir a Dataflow

    2. Haz clic en el trabajo con un nombre que comience con song-rank.

    3. En la parte inferior de la pantalla, haz clic en Mostrar para abrir el panel de registros.

    4. Haz clic en Registros de Worker para supervisar los registros de salida del flujo de cambios.

    Operaciones de escritura por transmisión

    Usa la CLI de cbt para escribir una cantidad de reproducciones de canciones para varios usuarios en la tabla song-rank. Está diseñada para escribir durante unos minutos y simular que se escuchan canciones de forma continua a lo largo del tiempo.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Revise el resultado.

    Lee el resultado en Cloud Storage para ver las canciones más populares.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Resultado de ejemplo:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]