Créer un pipeline de traitement en flux continu à l'aide d'un modèle Dataflow
Ce guide de démarrage rapide explique comment créer un pipeline de traitement en flux continu à l'aide d'un modèle Dataflow fourni par Google. Plus précisément, nous utiliserons ici le modèle Pub/Sub vers BigQuery à titre d'exemple.
Le modèle Pub/Sub vers BigQuery est un pipeline de flux de données capable de lire des messages au format JSON à partir d'un sujet Pub/Sub et de les écrire dans une table BigQuery.
Pour obtenir des instructions détaillées sur cette tâche directement dans la Google Cloud console, cliquez sur Visite guidée:
Avant de commencer
Effectuez les étapes suivantes avant d'exécuter votre pipeline.
Configurer votre projet
- Connectez-vous à votre Google Cloud compte. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits sans frais pour exécuter, tester et déployer des charges de travail.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
Roles required to select or create a project
- Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
-
Create a project: To create a project, you need the Project Creator role
(
roles/resourcemanager.projectCreator), which contains theresourcemanager.projects.createpermission. Learn how to grant roles.
-
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.
Roles required to enable APIs
To enable APIs, you need the Service Usage Admin IAM role (
roles/serviceusage.serviceUsageAdmin), which contains theserviceusage.services.enablepermission. Learn how to grant roles.
Rôles requis
Pour suivre ce guide de démarrage rapide, vous avez besoin des rôles Identity and Access Management (IAM) suivants.
Pour obtenir les autorisations nécessaires pour suivre ce guide de démarrage rapide, demandez à votre administrateur de vous accorder les rôles IAM suivants sur votre projet :
- Utilisateur BigQuery (
roles/bigquery.user) - Administrateur Dataflow (
roles/dataflow.admin) - Utilisateur du compte de service (
roles/iam.serviceAccountUser) - Administrateur de l'espace de stockage (
roles/storage.admin)
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Vous pouvez également obtenir les autorisations requises avec des rôles personnalisés ou d'autres rôles prédéfinis.
Pour vous assurer que le compte de service Compute Engine par défaut dispose des autorisations nécessaires pour exécuter la tâche Dataflow, demandez à votre administrateur d'attribuer les rôles IAM suivants au compte de service Compute Engine par défaut dans votre projet :
- Éditeur de données BigQuery (
roles/bigquery.dataEditor) - Nœud de calcul Dataflow (
roles/dataflow.worker) - Éditeur Pub/Sub (
roles/pubsub.editor) - Administrateur des objets de l'espace de stockage (
roles/storage.objectAdmin) - Lecteur (
roles/viewer)
Pour en savoir plus sur l'attribution de rôles, consultez la page Gérer l'accès aux projets, aux dossiers et aux organisations.
Votre administrateur peut également attribuer au compte de service Compute Engine par défaut les autorisations requises via des rôles personnalisés ou d'autres rôles prédéfinis.
Créer un bucket Cloud Storage
Avant de pouvoir exécuter un pipeline, vous devez créer un bucket Cloud Storage.
Créez un bucket Cloud Storage :
- Dans la Google Cloud console, accédez à la page Buckets de Cloud Storage.
- Cliquez sur Créer.
- Sur la page Créer un bucket, saisissez les informations concernant votre bucket. Pour passer à l'étape suivante, cliquez sur Continuer.
- Pour Nommer votre bucket, saisissez un nom unique. N'incluez aucune information sensible dans le nom des buckets, car leur espace de noms est global et visible par tous.
-
Dans la section Choisir l'emplacement de stockage de vos données, procédez comme suit :
- Sélectionnez un type d'emplacement.
- Choisissez un emplacement où les données de votre bucket seront stockées de manière permanente dans le menu déroulant Type d'emplacement.
- Si vous sélectionnez le type d'emplacement birégional, vous pouvez également choisir d'activer la réplication turbo à l'aide de la case à cocher correspondante.
- Pour configurer la réplication entre buckets, sélectionnez
Ajouter une réplication entre buckets via le service de transfert de stockage et
procédez comme suit :
Configurer la réplication entre buckets
- Dans le menu Bucket, sélectionnez un bucket.
Dans la section Paramètres de réplication , cliquez sur Configurer pour configurer les paramètres du job de réplication.
Le volet Configurer la réplication entre buckets s'affiche.
- Pour filtrer les objets à répliquer en fonction du préfixe de leur nom, saisissez le préfixe avec lequel vous souhaitez inclure ou exclure des objets, puis cliquez sur Ajouter un préfixe.
- Pour définir une classe de stockage pour les objets répliqués, sélectionnez-en une dans le menu Classe de stockage. Si vous ignorez cette étape, les objets répliqués utiliseront la classe de stockage par défaut du bucket de destination.
- Cliquez sur OK.
-
Dans la section Choisir comment stocker vos données, procédez comme suit :
- Dans la section Définir une classe par défaut, sélectionnez Standard.
- Pour activer l'espace de noms hiérarchique, dans la section Optimiser le stockage pour les charges de travail utilisant beaucoup de données, sélectionnez Activer l'espace de noms hiérarchique sur ce bucket.
- Dans la section Choisir comment contrôler l'accès aux objets, indiquez si votre bucket applique ou non la protection contre l'accès public, et sélectionnez une méthode de contrôle des accès pour les objets de votre bucket.
-
Dans la section Choisir comment protéger les données d'objet, procédez comme suit :
- Sous Protection des données , sélectionnez les options que vous
voulez définir pour votre bucket.
- Pour activer la suppression réversible, cochez la case Règle de suppression réversible (pour la récupération de données), puis spécifiez le nombre de jours pendant lesquels vous souhaitez conserver les objets après leur suppression.
- Pour définir la gestion des versions d'objets, cochez la case Gestion des versions d'objets (pour le contrôle des versions), puis spécifiez le nombre maximal de versions par objet et le nombre de jours après lesquels les versions obsolètes expirent.
- Pour activer la règle de conservation sur les objets et les buckets, cochez la case Conservation (pour la conformité), puis procédez comme suit :
- Pour activer le verrouillage de la conservation des objets, cochez la case Activer la conservation des objets.
- Pour activer le verrouillage de bucket, cochez la case Définir une règle de conservation de bucket, puis choisissez une unité de temps et une durée pour votre période de conservation.
- Pour choisir comment vos données d'objet seront chiffrées, développez la section Chiffrement des données (), puis sélectionnez une méthode de chiffrement des données.
- Sous Protection des données , sélectionnez les options que vous
voulez définir pour votre bucket.
- Cliquez sur Créer.
Copiez les éléments suivants, car vous en aurez besoin dans une section ultérieure :
- Le nom de votre bucket Cloud Storage.
- ID de votre Google Cloud projet.
Pour trouver cet ID, consultez la section Identifier des projets.
Réseau VPC
Par défaut, chaque nouveau projet démarre avec un réseau
par défaut. Si le réseau par défaut de votre
projet est désactivé ou a été supprimé, vous devez
disposer d'un réseau dans votre projet pour lequel votre compte utilisateur dispose du rôle Utilisateur de réseau
de Compute
(roles/compute.networkUser).
Créer un ensemble de données et une table BigQuery
Créez un ensemble de données et une table BigQuery selon le schéma approprié à votre sujet Pub/Sub à l'aide de la Google Cloud console.
Dans cet exemple, le nom de l'ensemble de données est taxirides et le nom de la table est realtime. Pour créer cet ensemble de données et cette table, procédez comme suit :
- Accédez à la page BigQuery.
Accéder à BigQuery - Dans le panneau Explorateur, à côté du projet dans lequel vous souhaitez créer l' ensemble de données, cliquez sur Afficher les actions, puis cliquez sur Créer un ensemble de données.
- Dans le panneau Créer un ensemble de données, procédez comme suit :
- Dans le champ ID de l'ensemble de données, saisissez
taxirides. Les ID des ensembles de données sont uniques pour chaque Google Cloud projet. - Pour Type d'emplacement, choisissez Multirégional, puis sélectionnez US (plusieurs régions aux États-Unis). Les ensembles de données publics sont stockés dans l'emplacement multirégional
US. Par souci de simplicité, utilisez le même emplacement pour votre ensemble de données. - Conservez les autres paramètres par défaut, puis cliquez sur Créer un ensemble de données.
- Dans le panneau
Explorateur , développez votre projet. - À côté de votre
taxiridesensemble de données, cliquez sur Afficher les actions, puis cliquez sur Créer une table. - Dans le panneau Créer une table, procédez comme suit :
- Dans la section Source, sous Créer une table à partir de, sélectionnez Table vide.
- Pour le champ Table de la section Destination, saisissez
realtime. - Dans la section Schéma, cliquez sur le bouton Modifier sous forme de texte et collez la définition de schéma suivante dans la zone :
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- Dans la section Paramètres de partitionnement et de clustering, sous Partitionnement, sélectionnez le champ Code temporel.
- Ne modifiez aucun autre paramètre par défaut et cliquez sur Créer une table.
Exécuter le pipeline
Exécutez un pipeline de flux de données à l'aide du modèle Pub/Sub vers BigQuery fourni par Google. Le pipeline reçoit des données entrantes à partir du sujet d'entrée.
- Accédez à la page Tâches Dataflow.
Accéder aux tâches - Cliquez sur
Créer une tâche à partir d'un modèle . - Saisissez
taxi-datacomme nom de la tâche pour votre tâche Dataflow. - Pour Modèle Dataflow, sélectionnez le modèle Pub/Sub vers BigQuery.
- Dans le champ Table BigQuery de sortie, saisissez ce qui suit :
PROJECT_ID:taxirides.realtime
Remplacez
PROJECT_IDpar l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery. - Dans la section Paramètres source facultatifs, cliquez sur Saisir le sujet manuellement pour le champ Sujet Pub/Sub d'entrée.
- Dans la boîte de dialogue, saisissez les éléments suivants comme Nom du sujet, puis cliquez sur Enregistrer :
projects/pubsub-public-data/topics/taxirides-realtime
Ce sujet Pub/Sub disponible publiquement est basé sur l' ensemble de données ouvert de la NYC Taxi & Limousine Commission. Voici un exemple de message de cet article au format JSON :
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- Dans le champ Emplacement temporaire, saisissez ce qui suit :
gs://BUCKET_NAME/temp/
Remplacez
BUCKET_NAMEpar le nom de votre bucket Cloud Storage. Le dossiertempstocke les fichiers temporaires, comme la tâche de pipeline en préproduction. - Si votre projet ne possède pas de réseau par défaut, saisissez un réseau et un sous-réseau. Pour plus d'informations, consultez la section Spécifier un réseau et un sous-réseau.
- Cliquez sur Run Job (Exécuter la tâche).
Afficher les résultats
Pour afficher les données écrites dans la tablerealtime, procédez comme suit :
Accédez à la page BigQuery.
Cliquez sur Saisir une nouvelle requête. Un onglet Éditeur s'affiche.
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
Remplacez
PROJECT_IDpar l'ID du projet dans lequel vous avez créé l'ensemble de données BigQuery. L'affichage des données dans votre table peut prendre jusqu'à cinq minutes.Cliquez sur Exécuter.
La requête renvoie les lignes qui ont été ajoutées à votre table au cours des dernières 24 heures. Vous pouvez également exécuter des requêtes en SQL standard.
Libérer de l'espace
Pour éviter que les ressources utilisées dans cette démonstration soient facturées sur votre Google Cloud compte pour les ressources utilisées sur cette page, procédez comme suit :
Supprimer le projet
Le moyen le plus simple d'empêcher la facturation est de supprimer le Google Cloud projet que vous avez créé pour ce guide de démarrage rapide.- Dans la Google Cloud console, accédez à la page Gérer les ressources.
- Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
- Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez Arrêter pour supprimer le projet.
Supprimer les ressources individuelles
Si vous souhaitez conserver le Google Cloud projet que vous avez utilisé dans ce guide de démarrage rapide, supprimez les ressources individuelles :
- Accédez à la page Tâches Dataflow.
Accéder aux tâches - Sélectionnez votre tâche de traitement par flux dans la liste des tâches.
- Dans la barre de navigation, cliquez sur Arrêter.
- Dans la boîte de dialogue Arrêter la tâche, annulez ou drainez votre pipeline, puis cliquez sur Arrêter la tâche.
- Accédez à la page BigQuery.
Accéder à BigQuery - Sur le panneau Explorateur, développez votre projet.
- À côté de l'ensemble de données que vous souhaitez supprimer, cliquez sur Afficher les actions, puis sur Ouvrir.
- Dans le panneau des détails, cliquez sur Supprimer l'ensemble de données, puis suivez les instructions.
- Dans la Google Cloud console, accédez à la page Buckets de Cloud Storage.
- Cochez la case correspondant au bucket que vous souhaitez supprimer.
- Pour supprimer le bucket, cliquez Supprimer, puis suivez les instructions.