Cette page décrit les caractéristiques de performances des jobs de streaming Dataflow qui lisent des données depuis Pub/Sub et les écrivent dans BigQuery. Il fournit les résultats des tests de référence pour deux types de pipelines de streaming :
Carte uniquement (transformation par message) : pipelines qui effectuent des transformations par message, sans suivre l'état ni regrouper les éléments du flux. Par exemple, l'ETL, la validation des champs et le mappage de schéma.
Agrégation avec fenêtres (
GroupByKey) : pipelines qui effectuent des opérations avec état et regroupent les données en fonction d'une clé et d'une fenêtre temporelle. Par exemple, il peut s'agir de compter des événements, de calculer des sommes et de collecter des enregistrements pour une session utilisateur.
La plupart des charges de travail pour l'intégration de données en flux continu appartiennent à ces deux catégories. Si votre pipeline suit un schéma similaire, vous pouvez utiliser ces benchmarks pour évaluer votre job Dataflow par rapport à une configuration de référence performante.
Méthodologie de test
Les benchmarks ont été effectués à l'aide des ressources suivantes :
Sujet Pub/Sub préprovisionné avec une charge d'entrée constante. Les messages ont été générés à l'aide du modèle Générateur de données de streaming.
- Fréquence des messages : environ 1 000 000 de messages par seconde
- Charge d'entrée : 1 Gio/s
- Format du message : texte JSON généré de manière aléatoire avec un schéma fixe
- Taille du message : environ 1 Kio par message
Une table BigQuery standard.
Pipelines de streaming Dataflow basés sur le modèle Pub/Sub vers BigQuery. Ces pipelines effectuent l'analyse et le mappage de schéma minimaux requis. Aucune fonction définie par l'utilisateur (UDF) personnalisée n'a été utilisée.
Une fois le scaling horizontal stabilisé et le pipeline arrivé à un état stable, les pipelines ont été autorisés à s'exécuter pendant environ une journée, après quoi les résultats ont été collectés et analysés.
Pipelines Dataflow
Deux variantes de pipeline ont été testées :
Pipeline de carte uniquement. Ce pipeline effectue un mappage et une conversion simples des messages JSON. Pour ce test, le modèle Pub/Sub vers BigQuery a été utilisé sans modification.
- Sémantique : le pipeline a été testé avec le mode "exactement une fois" et le mode "au moins une fois". Le traitement "au moins une fois" offre un meilleur débit. Toutefois, il ne doit être utilisé que lorsque les enregistrements en double sont acceptables ou que le récepteur en aval gère la déduplication.
Pipeline d'agrégation par fenêtre. Ce pipeline regroupe les messages par clé spécifique dans des fenêtres de taille fixe et écrit les enregistrements agrégés dans BigQuery. Pour ce test, un pipeline Apache Beam personnalisé basé sur le modèle Pub/Sub vers BigQuery a été utilisé.
Logique d'agrégation : pour chaque fenêtre d'une minute fixe et non chevauchante, les messages ayant la même clé ont été collectés et écrits sous forme d'enregistrement agrégé unique dans BigQuery. Ce type d'agrégation est couramment utilisé dans le traitement des journaux pour combiner des événements associés, tels que l'activité d'un utilisateur, en un seul enregistrement pour l'analyse en aval.
Parallélisme de clés : le benchmark a utilisé 1 000 000 de clés réparties de manière uniforme.
Sémantique : le pipeline a été testé en mode "exactement une fois". Les agrégations nécessitent une sémantique de type "exactement une fois" pour garantir l'exactitude et éviter le double comptage dans un groupe et une fenêtre.
Configuration du job
Le tableau suivant montre comment les jobs Dataflow ont été configurés.
| Paramètre | Mappage uniquement, exactement une fois | Map only, at-least-once | Agrégation fenêtrée, exactement une fois |
|---|---|---|---|
| Type de machine des nœuds de calcul | n1-standard-2 |
n1-standard-2 |
n1-standard-2 |
| Processeurs virtuels de la machine de nœud de calcul | 2 | 2 | 2 |
| RAM des machines des nœuds de calcul | 7,5 Gio | 7,5 Gio | 7,5 Gio |
| Disque persistant de la machine de nœud de calcul | Disque persistant standard (HDD), 30 Go | Disque persistant standard (HDD), 30 Go | Disque persistant standard (HDD), 30 Go |
| Nœuds de calcul initiaux | 70 | 30 | 180 |
| Nombre maximal de nœuds de calcul | 100 | 100 | 250 |
| Streaming Engine | Oui | Oui | Oui |
| Autoscaling horizontal | Oui | Oui | Oui |
| Modèle de facturation | Facturation basée sur les ressources | Facturation basée sur les ressources | Facturation basée sur les ressources |
| L'API Storage Write est-elle activée ? | Oui | Oui | Oui |
| Flux de l'API Storage Write | 200 | Non applicable | 500 |
| Fréquence de déclenchement de l'API Storage Write | 5 secondes | Non applicable | 5 secondes |
L'API BigQuery Storage Write est recommandée pour les pipelines de traitement en flux continu. Lorsque vous utilisez le mode "exactement une fois" avec l'API Storage Write, vous pouvez ajuster les paramètres suivants :
Nombre de flux d'écriture. Pour garantir un parallélisme de clé suffisant lors de l'étape d'écriture, définissez le nombre de flux de l'API Storage Write sur une valeur supérieure au nombre de processeurs de nœud de calcul, tout en maintenant un niveau raisonnable de débit de flux d'écriture BigQuery.
Fréquence de déclenchement : Une valeur de seconde à un chiffre convient aux pipelines à haut débit.
Pour en savoir plus, consultez Écrire des données depuis Dataflow vers BigQuery.
Résultats du benchmark
Cette section décrit les résultats des tests de référence.
Débit et utilisation des ressources
Le tableau suivant présente les résultats des tests pour le débit du pipeline et l'utilisation des ressources.
| Résultat | Mappage uniquement, exactement une fois | Map only, at-least-once | Agrégation fenêtrée, exactement une fois |
|---|---|---|---|
| Débit d'entrée par nœud de calcul | Moyenne : 17 Mo/s, n=3 | Moyenne : 21 Mbit/s, n=3 | Moyenne : 6 Mbit/s, n=3 |
| Utilisation moyenne du processeur sur tous les nœuds de calcul | Moyenne : 65 %, n=3 | Moyenne : 69 %, n=3 | Moyenne : 80 %, n=3 |
| Nombre de nœuds de calcul | Moyenne : 57, n=3 | Moyenne : 48, n=3 | Moyenne : 169, n=3 |
| Unités de calcul Streaming Engine par heure | Moyenne : 125, n=3 | Moyenne : 46, n=3 | Moyenne : 354, n=3 |
L'algorithme d'autoscaling peut avoir une incidence sur le niveau d'utilisation du processeur cible. Pour atteindre un objectif d'utilisation du processeur plus ou moins élevé, vous pouvez définir la plage d'autoscaling ou l'indication d'utilisation des nœuds de calcul. Des objectifs d'utilisation plus élevés peuvent entraîner une baisse des coûts, mais aussi une latence de queue plus élevée, en particulier pour les charges variables.
Pour un pipeline d'agrégation de fenêtre, le type d'agrégation, la taille de la fenêtre et le parallélisme des clés peuvent avoir un impact important sur l'utilisation des ressources.
Latence
Le tableau suivant présente les résultats du benchmark pour la latence du pipeline.
| Latence totale de bout en bout de l'étape | Mappage uniquement, exactement une fois | Map only, at-least-once | Agrégation fenêtrée, exactement une fois |
|---|---|---|---|
| P50 | Moyenne : 800 ms, n=3 | Moyenne : 160 ms, n=3 | Moyenne : 3 400 ms, n=3 |
| P95 | Moyenne : 2 000 ms, n=3 | Moyenne : 250 ms, n=3 | Moyenne : 13 000 ms, n=3 |
| P99 | Moyenne : 2 800 ms, n=3 | Moyenne : 410 ms, n=3 | Moyenne : 25 000 ms, n=3 |
Les tests ont mesuré la latence de bout en bout par étape (métrique job/streaming_engine/stage_end_to_end_latencies) lors de trois exécutions de tests de longue durée. Cette métrique mesure le temps passé par le moteur de streaming dans chaque étape du pipeline. Il englobe toutes les étapes internes du pipeline, telles que :
- Mélanger et mettre en file d'attente les messages à traiter
- Temps de traitement réel (par exemple, conversion des messages en objets de ligne)
- Écriture de l'état persistant, ainsi que temps passé dans la file d'attente pour écrire l'état persistant
La fraîcheur des données est une autre métrique de latence. Toutefois, la fraîcheur des données est affectée par des facteurs tels que le fenêtrage défini par l'utilisateur et les retards en amont dans la source. La latence système fournit une référence plus objective pour l'efficacité et l'état du traitement interne d'un pipeline sous charge.
Les données ont été mesurées sur environ une journée par exécution, les périodes de démarrage initiales étant supprimées afin de refléter des performances stables et à l'état d'équilibre. Les résultats montrent deux facteurs qui introduisent une latence supplémentaire :
Mode "exactement une fois". Pour obtenir une sémantique de type "exactement une fois", le mélange déterministe et les recherches d'état persistant sont nécessaires pour la déduplication. Le mode "Au moins une fois" est beaucoup plus rapide, car il ignore ces étapes.
Agrégation fenêtrée. Les messages doivent être entièrement mélangés, mis en mémoire tampon et écrits dans un état persistant avant la fermeture de la fenêtre, ce qui augmente la latence de bout en bout.
Les benchmarks présentés ici représentent une référence. La latence est très sensible à la complexité du pipeline. Les UDF personnalisées, les transformations supplémentaires et la logique de fenêtrage complexe peuvent tous augmenter la latence. Les agrégations simples à forte réduction, telles que la somme et le nombre, ont tendance à entraîner une latence plus faible que les opérations nécessitant beaucoup d'état, telles que la collecte d'éléments dans une liste.
Coûts estimés
Vous pouvez estimer le coût de référence de votre propre pipeline comparable avec la facturation basée sur les ressources à l'aide du simulateur de coût Google Cloud Platform, comme suit :
- Ouvrez le simulateur de coût.
- Cliquez sur Ajouter à l'estimation.
- Sélectionnez "Dataflow".
- Pour Type de service, sélectionnez "Dataflow Classic".
- Sélectionnez Paramètres avancés pour afficher l'ensemble des options.
- Choisissez l'emplacement où le job s'exécute.
- Dans le champ Type de job, sélectionnez "Streaming".
- Sélectionnez Activer Streaming Engine.
- Saisissez les informations concernant les heures d'exécution du job, les nœuds de calcul, les machines de nœud de calcul et le stockage sur disque persistant.
- Saisissez le nombre estimé d'unités de calcul Streaming Engine.
L'utilisation des ressources et les coûts évoluent de manière à peu près linéaire avec le débit d'entrée. Toutefois, pour les petits jobs avec seulement quelques nœuds de calcul, le coût total est dominé par les coûts fixes. Pour commencer, vous pouvez extrapoler le nombre de nœuds de calcul et la consommation de ressources à partir des résultats du benchmark.
Par exemple, supposons que vous exécutiez un pipeline de type "map-only" en mode "exactement une fois", avec un débit de données d'entrée de 100 Mio/s. Sur la base des résultats du benchmark pour un pipeline de 1 Gio/s, vous pouvez estimer les besoins en ressources comme suit :
- Facteur de scaling : (100 Mio/s) / (1 Gio/s) = 0,1
- Nombre de nœuds de calcul projeté : 57 nœuds de calcul × 0,1 = 5,7 nœuds de calcul
- Nombre prévu d'unités de calcul Streaming Engine par heure : 125 × 0,1 = 12,5 unités par heure
Cette valeur ne doit être utilisée qu'à titre d'estimation initiale. Le débit et le coût réels peuvent varier considérablement en fonction de facteurs tels que le type de machine, la distribution de la taille des messages, le code utilisateur, le type d'agrégation, le parallélisme des clés et la taille de la fenêtre. Pour en savoir plus, consultez Bonnes pratiques pour optimiser les coûts Dataflow.
Exécuter un pipeline de test
Cette section présente les commandes gcloud dataflow flex-template run qui ont été utilisées pour exécuter le pipeline de mappage uniquement.
Mode "exactement une fois"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 70 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_IDsubscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true,\
numStorageWriteApiStreams=200 \
storageWriteApiTriggeringFrequencySec=5
Mode "au moins une fois"
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/PubSub_to_BigQuery_Flex \
--enable-streaming-engine \
--num-workers 30 \
--max-workers 100 \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
useStorageWriteApi=true \
--additional-experiments streaming_mode_at_least_once
Remplacez les éléments suivants :
JOB_ID: ID du job DataflowPROJECT_ID: ID du projetSUBSCRIPTION_NAME: nom de l'abonnement Pub/SubDATASET: nom de l'ensemble de données BigQueryTABLE_NAME: nom de la table BigQuery
Générer des données de test
Pour générer des données de test, exécutez le modèle de générateur de données de streaming à l'aide de la commande suivante :
gcloud dataflow flex-template run JOB_ID \
--template-file-gcs-location gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--num-workers 70 \
--max-workers 100 \
--parameters \
topic=projects/PROJECT_ID/topics/TOPIC_NAME,\
qps=1000000,\
maxNumWorkers=100,\
schemaLocation=SCHEMA_LOCATION
Remplacez les éléments suivants :
JOB_ID: ID du job DataflowPROJECT_ID: ID du projetTOPIC_NAME: nom du sujet Pub/SubSCHEMA_LOCATION: chemin d'accès à un fichier de schéma dans Cloud Storage
Le modèle "Générateur de données de streaming" utilise un fichier JSON Data Generator pour définir le schéma du message. Les tests de référence ont utilisé un schéma de message semblable à celui-ci :
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Étapes suivantes
- Utiliser l'interface de surveillance des jobs Dataflow
- Bonnes pratiques pour optimiser les coûts Dataflow
- Résoudre les problèmes liés aux jobs de traitement de flux lents ou bloqués
- Lire des données depuis Pub/Sub vers Dataflow
- Écrire des données depuis Dataflow vers BigQuery