Ce tutoriel explique comment déployer un pipeline de données dans Dataflow pour un flux en temps réel de modifications de base de données provenant du flux de modifications d'une table Bigtable. La sortie du pipeline est écrite dans une série de fichiers sur Cloud Storage.
Un exemple de jeu de données pour une application d'écoute de musique est fourni. Dans ce tutoriel, vous allez suivre les chansons écoutées, puis classer les cinq premières sur une période donnée.
Ce tutoriel est destiné aux utilisateurs techniques qui savent écrire du code et déployer des pipelines de données dans Google Cloud.
Objectifs
Ce tutoriel vous explique comment effectuer les tâches suivantes :
- Créer une table Bigtable avec un flux de modifications activé.
- Déployer un pipeline sur Dataflow qui transforme et génère le flux de modifications.
- Afficher les résultats de votre pipeline de données.
Coûts
Dans ce tutoriel, vous utilisez les composants facturables de suivants Google Cloud:
Obtenez une estimation des coûts en fonction de votre utilisation prévue,
utilisez le simulateur de coût.
Une fois que vous avez terminé les tâches décrites dans ce document, supprimez les ressources que vous avez créées pour éviter que des frais vous soient facturés. Pour en savoir plus, consultez la section Effectuer un nettoyage.
Avant de commencer
- Sélectionner un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
-
Créer un projet : pour créer un projet, vous avez besoin du rôle Créateur de projet
(
roles/resourcemanager.projectCreator), qui contient l'autorisationresourcemanager.projects.create. Découvrez comment attribuer des rôles. -
Créez un Google Cloud projet :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_IDpar un nom pour le Google Cloud projet que vous créez. -
Sélectionnez le Google Cloud projet que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_IDpar le nom de votre Google Cloud projet. - Sélectionner un projet : la sélection d'un projet ne nécessite pas de rôle IAM spécifique Vous pouvez sélectionner n'importe quel projet pour lequel un rôle vous a été attribué.
-
Créer un projet : pour créer un projet, vous avez besoin du rôle Créateur de projet
(
roles/resourcemanager.projectCreator), qui contient l'autorisationresourcemanager.projects.create. Découvrez comment attribuer des rôles. -
Créez un Google Cloud projet :
gcloud projects create PROJECT_ID
Remplacez
PROJECT_IDpar un nom pour le Google Cloud projet que vous créez. -
Sélectionnez le Google Cloud projet que vous avez créé :
gcloud config set project PROJECT_ID
Remplacez
PROJECT_IDpar le nom de votre Google Cloud projet. - Mettez à jour et installez l'CLI
cbt.gcloud components update gcloud components install cbt
Installez la Google Cloud CLI. Une fois que la Google Cloud CLI est installée, initialisezla en exécutant la commande suivante :
gcloud initSi vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.
Créez ou sélectionnez un Google Cloud projet.
Rôles requis pour sélectionner ou créer un projet
Vérifiez que la facturation est activée pour votre Google Cloud projet.
Activez les API Dataflow, Cloud Bigtable, API Cloud Bigtable Admin et Cloud Storage :
Rôles requis pour activer les API
Pour activer les API, vous avez besoin du rôle IAM Administrateur d'utilisation du service (roles/serviceusage.serviceUsageAdmin), qui contient l'
serviceusage.services.enable autorisation. Découvrez comment attribuer
des rôles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Installez la Google Cloud CLI. Une fois que la Google Cloud CLI est installée, initialisezla en exécutant la commande suivante :
gcloud initSi vous utilisez un fournisseur d'identité (IdP) externe, vous devez d'abord vous connecter à la gcloud CLI avec votre identité fédérée.
Créez ou sélectionnez un Google Cloud projet.
Rôles requis pour sélectionner ou créer un projet
Vérifiez que la facturation est activée pour votre Google Cloud projet.
Activez les API Dataflow, Cloud Bigtable, API Cloud Bigtable Admin et Cloud Storage :
Rôles requis pour activer les API
Pour activer les API, vous avez besoin du rôle IAM Administrateur d'utilisation du service (roles/serviceusage.serviceUsageAdmin), qui contient l'
serviceusage.services.enable autorisation. Découvrez comment attribuer
des rôles.
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
Préparer l'environnement
Obtenir le code
Clonez le dépôt contenant l'exemple de code. Si vous avez déjà téléchargé ce dépôt, extrayez-le pour obtenir la dernière version.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
Créer un bucket
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME par un nom de bucket
qui répond aux conditions requises pour le nom des buckets.
Créer une instance Bigtable
Vous pouvez utiliser une instance existante pour ce tutoriel ou en créer une avec les configurations par défaut dans une région proche de vous.
Créer une table
L'exemple d'application suit les chansons écoutées par les utilisateurs et stocke les événements d'écoute dans Bigtable. Créez une table avec un flux de modifications activé, qui comporte une famille de colonnes (cf) et une colonne (song), et qui utilise des ID utilisateur pour les clés de ligne.
Créez la table.
gcloud bigtable instances tables create song-rank \
--column-families=cf --change-stream-retention-period=7d \
--instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
Remplacez les éléments suivants :
- PROJECT_ID : ID du projet que vous utilisez.
- BIGTABLE_INSTANCE_ID : ID de l'instance qui contiendra la nouvelle table.
Démarrer le pipeline
Ce pipeline transforme le flux de modifications en procédant comme suit :
- Lit le flux de modifications.
- Obtient le nom de la chanson.
- Regroupe les événements d'écoute de la chanson dans des fenêtres de N secondes.
- Compte les cinq premières chansons.
- Génère les résultats.
Exécutez le pipeline.
mvn compile exec:java -Dexec.mainClass=SongRank \
"-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
--bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
--outputLocation=gs://BUCKET_NAME/ \
--runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
Remplacez BIGTABLE_REGION par l'ID de la région dans laquelle se trouve votre instance Bigtable, par exemple us-east5.
Comprendre le pipeline
Les extraits de code suivants du pipeline peuvent vous aider à comprendre le code que vous exécutez.
Lecture du flux de modifications
Le code de cet exemple configure le flux source avec les paramètres de l'instance et de la table Bigtable spécifiques.
Obtenir le nom de la chanson
Lorsqu'une chanson est écoutée, son nom est écrit dans la famille de colonnes cf et le qualificatif de colonne song. Le code extrait donc la valeur de la mutation du flux de modifications et la génère à l'étape suivante du pipeline.
Compter les cinq premières chansons
Vous pouvez utiliser les fonctions Beam intégrées Count et Top.of pour obtenir les cinq premières chansons de la fenêtre actuelle.
Générer les résultats
Ce pipeline écrit les résultats dans la sortie standard ainsi que dans des fichiers. Pour les fichiers, il regroupe les écritures en groupes de 10 éléments ou en segments d'une minute.
Afficher le pipeline
Dans la Google Cloud console, accédez à la page Dataflow.
Cliquez sur le job dont le nom commence par song-rank.
En bas de l'écran, cliquez sur Afficher pour ouvrir le panneau des journaux.
Cliquez sur Journaux du nœud de calcul pour surveiller les journaux de sortie du flux de modifications.
Écritures en flux continu
Utilisez l'interface de ligne de commande
cbt CLI
pour écrire un certain nombre d'écoutes de chansons pour différents utilisateurs dans
la table song-rank. Cette opération est conçue pour s'effectuer sur quelques minutes afin de simuler la diffusion en flux continu des écoutes de chansons au fil du temps.
cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
song-rank song-rank-data.csv column-family=cf batch-size=1
Consulter le résultat
Lisez la sortie sur Cloud Storage pour voir les chansons les plus populaires.
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
Exemple de résultat :
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
Effectuer un nettoyage
Pour éviter que les ressources utilisées lors de ce tutoriel soient facturées sur votre compte Google Cloud, supprimez le projet contenant les ressources, ou conservez le projet et supprimez les ressources individuelles.
Supprimer le projet
Supprimez un Google Cloud projet :
gcloud projects delete PROJECT_ID
Supprimer des ressources individuelles
Supprimez le bucket et les fichiers.
gcloud storage rm --recursive gs://BUCKET_NAME/Désactivez le flux de modifications dans la table.
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-periodSupprimez la table
song-rank.cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rankArrêtez le pipeline de flux de modifications.
Répertoriez les jobs pour obtenir l'ID de job.
gcloud dataflow jobs list --region=BIGTABLE_REGIONAnnulez le job.
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGIONRemplacez JOB_ID par l'ID de job affiché après la commande précédente.