Cette page décrit les caractéristiques de performances des jobs de streaming Dataflow qui lisent des données depuis Apache Kafka et les écrivent dans BigQuery. Il fournit des résultats de tests de référence pour les pipelines de carte uniquement, qui effectuent des transformations par message sans suivre l'état ni regrouper les éléments du flux.
De nombreuses charges de travail d'intégration de données, y compris l'ETL, la validation de champs et le mappage de schémas, appartiennent à la catégorie "map-only". Si votre pipeline suit ce modèle, vous pouvez utiliser ces benchmarks pour évaluer votre job Dataflow par rapport à une configuration de référence performante.
Méthodologie de test
Les benchmarks ont été effectués à l'aide des ressources suivantes :
Un cluster Managed Service pour Apache Kafka. Les messages ont été générés à l'aide du modèle Générateur de données de streaming.
- Fréquence des messages : environ 1 000 000 de messages par seconde
- Charge d'entrée : 1 Gio/s
- Format du message : texte JSON généré de manière aléatoire avec un schéma fixe
- Taille du message : environ 1 Kio par message
- Partitions Kafka : 1 000
Une table BigQuery standard.
Un pipeline de streaming Dataflow qui utilise le modèle Apache Kafka vers BigQuery. Ce pipeline effectue l'analyse et le mappage de schéma minimaux requis. Aucune fonction définie par l'utilisateur (UDF) personnalisée n'a été utilisée.
Une fois le scaling horizontal stabilisé et le pipeline en régime permanent, les pipelines ont été autorisés à s'exécuter pendant environ une journée, après quoi les résultats ont été collectés et analysés.
Pipeline Dataflow
Ce benchmark utilise un pipeline de mappage uniquement qui effectue un mappage et une conversion simples des messages JSON. Le pipeline a été testé en mode "exactement une fois" et en mode "au moins une fois". Le traitement "au moins une fois" offre un meilleur débit. Toutefois, il ne doit être utilisé que lorsque les enregistrements en double sont acceptables ou que le récepteur en aval gère la déduplication.
Configuration du job
Le tableau suivant montre comment les jobs Dataflow ont été configurés.
| Paramètre | Valeur |
|---|---|
| Type de machine des nœuds de calcul | e2-standard-2 |
| Processeurs virtuels de la machine de nœud de calcul | 2 |
| RAM des machines des nœuds de calcul | 8 Go |
| Disque persistant de la machine de nœud de calcul | Disque persistant standard (HDD), 30 Go |
| Nombre maximal de nœuds de calcul | 120 |
| Streaming Engine | Oui |
| Autoscaling horizontal | Oui |
| Modèle de facturation | Facturation basée sur les ressources |
| L'API Storage Write est-elle activée ? | Oui |
| Flux de l'API Storage Write | 400 |
| Fréquence de déclenchement de l'API Storage Write | 5 secondes |
| Format des messages | JSON |
| Mode d'authentification Kafka |
Identifiants par défaut de l'application (ADC). Pour en savoir plus, consultez Types d'authentification pour les courtiers Kafka. |
L'API BigQuery Storage Write est recommandée pour les pipelines de traitement en flux continu. Lorsque vous utilisez le mode "exactement une fois" avec l'API Storage Write, vous pouvez ajuster les paramètres suivants :
Nombre de flux d'écriture. Pour garantir un parallélisme de clé suffisant dans la phase d'écriture, définissez le nombre de flux de l'API Storage Write sur une valeur supérieure au nombre de processeurs de nœud de calcul, tout en suivant les recommandations concernant le débit par flux.
Fréquence de déclenchement : Une valeur de seconde à un chiffre convient aux pipelines à haut débit.
Pour en savoir plus, consultez Écrire des données depuis Dataflow vers BigQuery.
Une attention particulière doit également être accordée au nombre de partitions Apache Kafka. Pour garantir un parallélisme de clé suffisant lors de la phase de lecture, le nombre de partitions doit au moins être égal au nombre total de vCPU de nœud de calcul. Pour en savoir plus, consultez Lire des données depuis Apache Kafka vers Dataflow.
Résultats du benchmark
Cette section décrit les résultats des tests de référence.
Débit et utilisation des ressources
Le tableau suivant présente les résultats des tests pour le débit du pipeline et l'utilisation des ressources.
| Résultat | Une seule fois | Au moins une fois |
|---|---|---|
| Débit d'entrée par nœud de calcul | Moyenne : 15 Mo/s, n=3 | Moyenne : 18 Mbit/s, n=3 |
| Utilisation moyenne du processeur sur tous les nœuds de calcul | Moyenne : 70 %, n=3 | Moyenne : 75 %, n=3 |
| Nombre de nœuds de calcul | Moyenne : 63, n=3 | Moyenne : 53, n=3 |
| Unités de calcul Streaming Engine par heure | Moyenne : 58, n=3 | Moyenne : 0, n=3 |
L'algorithme d'autoscaling peut avoir une incidence sur le niveau d'utilisation du processeur cible. Pour atteindre une utilisation cible du processeur plus ou moins élevée, vous pouvez définir la plage d'autoscaling ou l'indication d'utilisation des nœuds de calcul. Des cibles d'utilisation plus élevées peuvent entraîner une baisse des coûts, mais aussi une latence de queue plus élevée, en particulier pour les charges variables.
Latence
Le tableau suivant présente les résultats de référence pour la latence du pipeline en mode "exactement une fois", à l'exclusion de l'étape d'entrée.
| Latence de bout en bout totale de l'étape, à l'exclusion de l'étape d'entrée | Une seule fois |
|---|---|
| P50 | Moyenne : 1 200 ms, n=3 |
| P95 | Moyenne : 3 000 ms, n=3 |
| P99 | Moyenne : 5 400 ms, n=3 |
Les tests ont mesuré la latence de bout en bout par étape (métrique job/streaming_engine/stage_end_to_end_latencies) lors de trois exécutions de tests de longue durée. Cette métrique mesure le temps passé par le moteur de streaming dans chaque étape du pipeline. Il englobe toutes les étapes internes du pipeline, telles que :
- Mélanger et mettre en file d'attente les messages à traiter
- Temps de traitement réel (par exemple, conversion des messages en objets de ligne)
- Écriture de l'état persistant, ainsi que temps passé dans la file d'attente pour écrire l'état persistant
En raison d'une limitation de la métrique, la latence de l'étape d'entrée n'est pas indiquée. Par conséquent, il n'est pas inclus dans le total.
Les benchmarks présentés ici représentent une référence. La latence est très sensible à la complexité du pipeline. Les UDF personnalisées, les transformations supplémentaires et la logique de fenêtrage complexe peuvent toutes augmenter la latence.
Coûts estimés
Vous pouvez estimer le coût de référence de votre propre pipeline comparable avec la facturation basée sur les ressources à l'aide du simulateur de coût Google Cloud Platform, comme suit :
- Ouvrez le simulateur de coût.
- Cliquez sur Ajouter à l'estimation.
- Sélectionnez "Dataflow".
- Pour Type de service, sélectionnez "Dataflow Classic".
- Sélectionnez Paramètres avancés pour afficher l'ensemble des options.
- Choisissez l'emplacement où le job s'exécute.
- Dans le champ Job type (Type de job), sélectionnez "Streaming".
- Sélectionnez Activer Streaming Engine.
- Saisissez les informations concernant les heures d'exécution du job, les nœuds de calcul, les machines de nœud de calcul et le stockage sur disque persistant.
- Saisissez le nombre estimé d'unités de calcul Streaming Engine.
L'utilisation des ressources et les coûts évoluent à peu près de manière linéaire avec le débit d'entrée. Toutefois, pour les petits jobs avec seulement quelques nœuds de calcul, le coût total est dominé par les coûts fixes. Pour commencer, vous pouvez extrapoler le nombre de nœuds de calcul et la consommation de ressources à partir des résultats du benchmark.
Par exemple, supposons que vous exécutiez un pipeline de type "map-only" en mode "exactement une fois", avec un débit de données d'entrée de 100 Mio/s. Sur la base des résultats du benchmark pour un pipeline de 1 Gio/s, vous pouvez estimer les besoins en ressources comme suit :
- Facteur de scaling : (100 Mio/s) / (1 Gio/s) = 0,1
- Nombre de nœuds de calcul prévus : 63 nœuds de calcul × 0,1 = 6,3 nœuds de calcul
- Nombre prévu d'unités de calcul Streaming Engine par heure : 58 × 0,1 = 5,8 unités par heure
Cette valeur ne doit être utilisée que comme estimation initiale. Le débit et le coût réels peuvent varier considérablement en fonction de facteurs tels que le type de machine, la distribution de la taille des messages, le code utilisateur, le type d'agrégation, le parallélisme des clés et la taille de la fenêtre. Pour en savoir plus, consultez Bonnes pratiques pour optimiser les coûts Dataflow.
Exécuter un pipeline de test
Cette section présente les commandes gcloud dataflow flex-template run qui ont été utilisées pour exécuter le pipeline de mappage uniquement.
Mode "exactement une fois"
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400
Mode "au moins une fois"
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
--enable-streaming-engine \
--additional-experiments=streaming_mode_at_least_once \
--parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true
Remplacez les éléments suivants :
JOB_NAME: nom de la tâche DataflowPROJECT_ID: ID du projetKAFKA_BOOTSTRAP_ADDRESS: adresse d'amorçage du cluster Apache KafkaKAFKA_TOPIC: nom du sujet KafkaBQ_DATASET: nom de l'ensemble de données BigQueryBQ_TABLE_NAME: nom de la table BigQuery
Générer des données de test
Pour générer des données de test, exécutez le modèle de générateur de données de streaming à l'aide de la commande suivante :
gcloud dataflow flex-template run JOB_NAME \
--project=PROJECT_ID \
--template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
--max-workers=140 \
--parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON
Remplacez les éléments suivants :
JOB_NAME: nom de la tâche DataflowPROJECT_ID: ID du projetSCHEMA_LOCATION: chemin d'accès à un fichier de schéma dans Cloud StorageKAFKA_BOOTSTRAP_ADDRESS: adresse d'amorçage du cluster Apache KafkaKAFKA_TOPIC: nom du sujet Kafka
Le modèle "Générateur de données de streaming" utilise un fichier JSON Data Generator pour définir le schéma du message. Les tests de référence ont utilisé un schéma de message semblable à celui-ci :
{ "logStreamId": "{{integer(1000001,2000000)}}", "message": "{{alphaNumeric(962)}}" }
Étapes suivantes
- Utiliser l'interface de surveillance des jobs Dataflow
- Bonnes pratiques pour optimiser les coûts Dataflow
- Résoudre les problèmes liés aux jobs de traitement de flux lents ou bloqués
- Lire des données depuis Apache Kafka vers Dataflow
- Écrire des données depuis Dataflow vers BigQuery