Traiter un flux de modifications Bigtable

Ce tutoriel explique comment déployer un pipeline de données dans Dataflow pour un flux en temps réel des modifications apportées à la base de données, provenant du flux de modifications d'une table Bigtable. La sortie du pipeline est écrite dans une série de fichiers sur Cloud Storage.

Un exemple de jeu de données pour une application d'écoute de musique est fourni. Dans ce tutoriel, vous allez suivre les chansons écoutées, puis classer les cinq meilleures sur une période donnée.

Ce tutoriel est destiné aux utilisateurs techniques qui savent écrire du code et déployer des pipelines de données sur Google Cloud.

Préparer l'environnement

Obtenir le code

Clonez le dépôt contenant l'exemple de code. Si vous avez déjà téléchargé ce dépôt, extrayez-le pour obtenir la dernière version.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Créer un bucket

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    Créer une instance Bigtable

    Vous pouvez utiliser une instance existante pour ce tutoriel ou créer une instance avec les configurations par défaut dans une région proche de vous.

    Créer une table

    L'application exemple suit les titres écoutés par les utilisateurs et stocke les événements d'écoute dans Bigtable. Créez une table avec un flux de modifications activé, qui comporte une famille de colonnes (cf) et une colonne (song), et qui utilise des ID utilisateur pour les clés de ligne.

    Créez la table.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet que vous utilisez
    • BIGTABLE_INSTANCE_ID : ID de l'instance qui contiendra la nouvelle table.

    Démarrer le pipeline

    Ce pipeline transforme le flux de modifications en effectuant les opérations suivantes :

    1. Lit le flux de modifications
    2. Récupère le nom du titre
    3. Regroupe les événements d'écoute de titres dans des fenêtres de N secondes.
    4. Compte les cinq titres les plus écoutés
    5. Génère les résultats

    Exécutez le pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple us-east5.

    Comprendre le pipeline

    Les extraits de code suivants du pipeline peuvent vous aider à comprendre le code que vous exécutez.

    Lire le flux de modifications

    Le code de cet exemple configure le flux source avec les paramètres de l'instance et de la table Bigtable spécifiques.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obtenir le nom du titre

    Lorsqu'un utilisateur écoute un titre, son nom est écrit dans la famille de colonnes cf et le qualificatif de colonne song. Le code extrait donc la valeur de la mutation du flux de modifications et la transmet à l'étape suivante du pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Compter les cinq titres les plus écoutés

    Vous pouvez utiliser les fonctions Beam intégrées Count et Top.of pour obtenir les cinq titres les plus écoutés dans la fenêtre actuelle.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Afficher les résultats

    Ce pipeline écrit les résultats dans la sortie standard et dans des fichiers. Pour les fichiers, il regroupe les écritures par groupes de 10 éléments ou par segments d'une minute.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Afficher le pipeline

    1. Dans la console Google Cloud , accédez à la page Dataflow.

      Accéder à Dataflow

    2. Cliquez sur le job dont le nom commence par song-rank.

    3. En bas de l'écran, cliquez sur Afficher pour ouvrir le panneau des journaux.

    4. Cliquez sur Journaux des nœuds de calcul pour surveiller les journaux de sortie du flux de modifications.

    Écritures de flux

    Utilisez la CLI cbt pour écrire le nombre d'écoutes de titres pour différents utilisateurs dans la table song-rank. Il est conçu pour écrire pendant quelques minutes afin de simuler des écoutes de titres en streaming au fil du temps.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Consulter le résultat

    Lisez les résultats sur Cloud Storage pour découvrir les titres les plus populaires.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemple de résultat :

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]