Traiter un flux de modifications Bigtable

Ce tutoriel explique comment déployer un pipeline de données dans Dataflow pour un flux en temps réel de modifications de base de données provenant du flux de modifications d'une table Bigtable. La sortie du pipeline est écrite dans une série de fichiers sur Cloud Storage.

Un exemple de jeu de données pour une application d'écoute de musique est fourni. Dans ce tutoriel, vous allez suivre les chansons écoutées, puis classer les cinq premières sur une période donnée.

Ce tutoriel est destiné aux utilisateurs techniques qui savent écrire du code et déployer des pipelines de données dans Google Cloud.

Objectifs

Ce tutoriel vous explique comment effectuer les tâches suivantes :

  • Créer une table Bigtable avec un flux de modifications activé.
  • Déployer un pipeline sur Dataflow qui transforme et génère le flux de modifications.
  • Afficher les résultats de votre pipeline de données.

Coûts

Dans ce tutoriel, vous utilisez les composants facturables de suivants Google Cloud:

Obtenez une estimation des coûts en fonction de votre utilisation prévue, utilisez le simulateur de coût.

Les nouveaux utilisateurs de peuvent bénéficier d'un essai sans frais. Google Cloud

Une fois que vous avez terminé les tâches décrites dans ce document, supprimez les ressources que vous avez créées pour éviter que des frais vous soient facturés. Pour en savoir plus, consultez la section Effectuer un nettoyage.

Avant de commencer

    Connectez-vous à votre Google Cloud compte. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $de crédits sans frais pour exécuter, tester et déployer des charges de travail.

    Installez la Google Cloud CLI. Une fois que la Google Cloud CLI est installée, initialisezla en exécutant la commande suivante :

    gcloud init

    Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

    Créez ou sélectionnez un Google Cloud projet.

    Rôles requis pour sélectionner ou créer un projet

    • Sélectionner un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
    • Créer un projet : pour créer un projet, vous avez besoin du rôle Créateur de projet (roles/resourcemanager.projectCreator), qui contient l'autorisation resourcemanager.projects.create. Découvrez comment attribuer des rôles.
    • Créez un Google Cloud projet :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par un nom pour le Google Cloud projet que vous créez.

    • Sélectionnez le Google Cloud projet que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre Google Cloud projet.

    Vérifiez que la facturation est activée pour votre Google Cloud projet.

    Activez les API Dataflow, Cloud Bigtable, API Cloud Bigtable Admin et Cloud Storage :

    Rôles requis pour activer les API

    Pour activer les API, vous avez besoin du rôle IAM Administrateur d'utilisation du service (roles/serviceusage.serviceUsageAdmin), qui contient l' serviceusage.services.enable autorisation. Découvrez comment attribuer des rôles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com

    Installez la Google Cloud CLI. Une fois que la Google Cloud CLI est installée, initialisezla en exécutant la commande suivante :

    gcloud init

    Si vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.

    Créez ou sélectionnez un Google Cloud projet.

    Rôles requis pour sélectionner ou créer un projet

    • Sélectionner un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
    • Créer un projet : pour créer un projet, vous avez besoin du rôle Créateur de projet (roles/resourcemanager.projectCreator), qui contient l'autorisation resourcemanager.projects.create. Découvrez comment attribuer des rôles.
    • Créez un Google Cloud projet :

      gcloud projects create PROJECT_ID

      Remplacez PROJECT_ID par un nom pour le Google Cloud projet que vous créez.

    • Sélectionnez le Google Cloud projet que vous avez créé :

      gcloud config set project PROJECT_ID

      Remplacez PROJECT_ID par le nom de votre Google Cloud projet.

    Vérifiez que la facturation est activée pour votre Google Cloud projet.

    Activez les API Dataflow, Cloud Bigtable, API Cloud Bigtable Admin et Cloud Storage :

    Rôles requis pour activer les API

    Pour activer les API, vous avez besoin du rôle IAM Administrateur d'utilisation du service (roles/serviceusage.serviceUsageAdmin), qui contient l' serviceusage.services.enable autorisation. Découvrez comment attribuer des rôles.

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  1. Mettez à jour et installez l'CLI cbt .
    gcloud components update
    gcloud components install cbt

Préparer l'environnement

Obtenir le code

Clonez le dépôt contenant l'exemple de code. Si vous avez déjà téléchargé ce dépôt, extrayez-le pour obtenir la dernière version.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Créer un bucket

  • Créez un bucket Cloud Storage :
    gcloud storage buckets create gs://BUCKET_NAME
    Remplacez BUCKET_NAME par un nom de bucket qui répond aux conditions requises pour le nom des buckets.
  • Créer une instance Bigtable

    Vous pouvez utiliser une instance existante pour ce tutoriel ou en créer une avec les configurations par défaut dans une région proche de vous.

    Créer une table

    L'exemple d'application suit les chansons écoutées par les utilisateurs et stocke les événements d'écoute dans Bigtable. Créez une table avec un flux de modifications activé, qui comporte une famille de colonnes (cf) et une colonne (song), et qui utilise des ID utilisateur pour les clés de ligne.

    Créez la table.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Remplacez les éléments suivants :

    • PROJECT_ID : ID du projet que vous utilisez.
    • BIGTABLE_INSTANCE_ID : ID de l'instance qui contiendra la nouvelle table.

    Démarrer le pipeline

    Ce pipeline transforme le flux de modifications en procédant comme suit :

    1. Lit le flux de modifications.
    2. Obtient le nom de la chanson.
    3. Regroupe les événements d'écoute de la chanson dans des fenêtres de N secondes.
    4. Compte les cinq premières chansons.
    5. Génère les résultats.

    Exécutez le pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple us-east5.

    Comprendre le pipeline

    Les extraits de code suivants du pipeline peuvent vous aider à comprendre le code que vous exécutez.

    Lecture du flux de modifications

    Le code de cet exemple configure le flux source avec les paramètres de l'instance et de la table Bigtable spécifiques.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Obtenir le nom de la chanson

    Lorsqu'une chanson est écoutée, son nom est écrit dans la famille de colonnes cf et le qualificatif de colonne song. Le code extrait donc la valeur de la mutation du flux de modifications et la génère à l'étape suivante du pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Compter les cinq premières chansons

    Vous pouvez utiliser les fonctions Beam intégrées Count et Top.of pour obtenir les cinq premières chansons de la fenêtre actuelle.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Générer les résultats

    Ce pipeline écrit les résultats dans la sortie standard ainsi que dans des fichiers. Pour les fichiers, il regroupe les écritures en groupes de 10 éléments ou en segments d'une minute.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    Afficher le pipeline

    1. Dans la Google Cloud console, accédez à la page Dataflow.

      Accéder à Dataflow

    2. Cliquez sur le job dont le nom commence par song-rank.

    3. En bas de l'écran, cliquez sur Afficher pour ouvrir le panneau des journaux.

    4. Cliquez sur Journaux du nœud de calcul pour surveiller les journaux de sortie du flux de modifications.

    Écritures en flux continu

    Utilisez l'interface de ligne de commande cbt CLI pour écrire un certain nombre d'écoutes de chansons pour différents utilisateurs dans la table song-rank. Cette opération est conçue pour s'effectuer sur quelques minutes afin de simuler la diffusion en flux continu des écoutes de chansons au fil du temps.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    Consulter le résultat

    Lisez la sortie sur Cloud Storage pour voir les chansons les plus populaires.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Exemple de résultat :

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    

    Effectuer un nettoyage

    Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.

    Supprimer le projet

      Supprimez un Google Cloud projet :

      gcloud projects delete PROJECT_ID

    Supprimer des ressources individuelles

    1. Supprimez le bucket et les fichiers.

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. Désactivez le flux de modifications dans la table.

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. Supprimez la table song-rank.

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. Arrêtez le pipeline de flux de modifications.

      1. Répertoriez les jobs pour obtenir l'ID de job.

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. Annulez le job.

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        Remplacez JOB_ID par l'ID de job affiché après la commande précédente.

    Étape suivante