Caractéristiques de performances des pipelines Kafka vers BigQuery

Cette page décrit les caractéristiques de performances des jobs de streaming Dataflow qui lisent des données depuis Apache Kafka et les écrivent dans BigQuery. Il fournit des résultats de tests de référence pour les pipelines de carte uniquement, qui effectuent des transformations par message sans suivre l'état ni regrouper les éléments du flux.

De nombreuses charges de travail d'intégration de données, y compris l'ETL, la validation de champs et le mappage de schémas, appartiennent à la catégorie "map-only". Si votre pipeline suit ce modèle, vous pouvez utiliser ces benchmarks pour évaluer votre job Dataflow par rapport à une configuration de référence performante.

Méthodologie de test

Les benchmarks ont été effectués à l'aide des ressources suivantes :

  • Un cluster Managed Service pour Apache Kafka. Les messages ont été générés à l'aide du modèle Générateur de données de streaming.

    • Fréquence des messages : environ 1 000 000 de messages par seconde
    • Charge d'entrée : 1 Gio/s
    • Format du message : texte JSON généré de manière aléatoire avec un schéma fixe
    • Taille du message : environ 1 Kio par message
    • Partitions Kafka : 1 000
  • Une table BigQuery standard.

  • Un pipeline de streaming Dataflow qui utilise le modèle Apache Kafka vers BigQuery. Ce pipeline effectue l'analyse et le mappage de schéma minimaux requis. Aucune fonction définie par l'utilisateur (UDF) personnalisée n'a été utilisée.

Une fois le scaling horizontal stabilisé et le pipeline en régime permanent, les pipelines ont été autorisés à s'exécuter pendant environ une journée, après quoi les résultats ont été collectés et analysés.

Pipeline Dataflow

Ce benchmark utilise un pipeline de mappage uniquement qui effectue un mappage et une conversion simples des messages JSON. Le pipeline a été testé en mode "exactement une fois" et en mode "au moins une fois". Le traitement "au moins une fois" offre un meilleur débit. Toutefois, il ne doit être utilisé que lorsque les enregistrements en double sont acceptables ou que le récepteur en aval gère la déduplication.

Configuration du job

Le tableau suivant montre comment les jobs Dataflow ont été configurés.

Paramètre Valeur
Type de machine des nœuds de calcul e2-standard-2
Processeurs virtuels de la machine de nœud de calcul 2
RAM des machines des nœuds de calcul 8 Go
Disque persistant de la machine de nœud de calcul Disque persistant standard (HDD), 30 Go
Nombre maximal de nœuds de calcul 120
Streaming Engine Oui
Autoscaling horizontal Oui
Modèle de facturation Facturation basée sur les ressources
L'API Storage Write est-elle activée ? Oui
Flux de l'API Storage Write 400
Fréquence de déclenchement de l'API Storage Write 5 secondes
Format des messages JSON
Mode d'authentification Kafka

Identifiants par défaut de l'application (ADC).

Pour en savoir plus, consultez Types d'authentification pour les courtiers Kafka.

L'API BigQuery Storage Write est recommandée pour les pipelines de traitement en flux continu. Lorsque vous utilisez le mode "exactement une fois" avec l'API Storage Write, vous pouvez ajuster les paramètres suivants :

  • Nombre de flux d'écriture. Pour garantir un parallélisme de clé suffisant dans la phase d'écriture, définissez le nombre de flux de l'API Storage Write sur une valeur supérieure au nombre de processeurs de nœud de calcul, tout en suivant les recommandations concernant le débit par flux.

  • Fréquence de déclenchement : Une valeur de seconde à un chiffre convient aux pipelines à haut débit.

Pour en savoir plus, consultez Écrire des données depuis Dataflow vers BigQuery.

Une attention particulière doit également être accordée au nombre de partitions Apache Kafka. Pour garantir un parallélisme de clé suffisant lors de la phase de lecture, le nombre de partitions doit au moins être égal au nombre total de vCPU de nœud de calcul. Pour en savoir plus, consultez Lire des données depuis Apache Kafka vers Dataflow.

Résultats du benchmark

Cette section décrit les résultats des tests de référence.

Débit et utilisation des ressources

Le tableau suivant présente les résultats des tests pour le débit du pipeline et l'utilisation des ressources.

Résultat Une seule fois Au moins une fois
Débit d'entrée par nœud de calcul Moyenne : 15 Mo/s, n=3 Moyenne : 18 Mbit/s, n=3
Utilisation moyenne du processeur sur tous les nœuds de calcul Moyenne : 70 %, n=3 Moyenne : 75 %, n=3
Nombre de nœuds de calcul Moyenne : 63, n=3 Moyenne : 53, n=3
Unités de calcul Streaming Engine par heure Moyenne : 58, n=3 Moyenne : 0, n=3

L'algorithme d'autoscaling peut avoir une incidence sur le niveau d'utilisation du processeur cible. Pour atteindre une utilisation cible du processeur plus ou moins élevée, vous pouvez définir la plage d'autoscaling ou l'indication d'utilisation des nœuds de calcul. Des cibles d'utilisation plus élevées peuvent entraîner une baisse des coûts, mais aussi une latence de queue plus élevée, en particulier pour les charges variables.

Latence

Le tableau suivant présente les résultats de référence pour la latence du pipeline en mode "exactement une fois", à l'exclusion de l'étape d'entrée.

Latence de bout en bout totale de l'étape, à l'exclusion de l'étape d'entrée Une seule fois
P50 Moyenne : 1 200 ms, n=3
P95 Moyenne : 3 000 ms, n=3
P99 Moyenne : 5 400 ms, n=3

Les tests ont mesuré la latence de bout en bout par étape (métrique job/streaming_engine/stage_end_to_end_latencies) lors de trois exécutions de tests de longue durée. Cette métrique mesure le temps passé par le moteur de streaming dans chaque étape du pipeline. Il englobe toutes les étapes internes du pipeline, telles que :

  • Mélanger et mettre en file d'attente les messages à traiter
  • Temps de traitement réel (par exemple, conversion des messages en objets de ligne)
  • Écriture de l'état persistant, ainsi que temps passé dans la file d'attente pour écrire l'état persistant

En raison d'une limitation de la métrique, la latence de l'étape d'entrée n'est pas indiquée. Par conséquent, il n'est pas inclus dans le total.

Les benchmarks présentés ici représentent une référence. La latence est très sensible à la complexité du pipeline. Les UDF personnalisées, les transformations supplémentaires et la logique de fenêtrage complexe peuvent toutes augmenter la latence.

Coûts estimés

Vous pouvez estimer le coût de référence de votre propre pipeline comparable avec la facturation basée sur les ressources à l'aide du simulateur de coût Google Cloud Platform, comme suit :

  1. Ouvrez le simulateur de coût.
  2. Cliquez sur Ajouter à l'estimation.
  3. Sélectionnez "Dataflow".
  4. Pour Type de service, sélectionnez "Dataflow Classic".
  5. Sélectionnez Paramètres avancés pour afficher l'ensemble des options.
  6. Choisissez l'emplacement où le job s'exécute.
  7. Dans le champ Job type (Type de job), sélectionnez "Streaming".
  8. Sélectionnez Activer Streaming Engine.
  9. Saisissez les informations concernant les heures d'exécution du job, les nœuds de calcul, les machines de nœud de calcul et le stockage sur disque persistant.
  10. Saisissez le nombre estimé d'unités de calcul Streaming Engine.

L'utilisation des ressources et les coûts évoluent à peu près de manière linéaire avec le débit d'entrée. Toutefois, pour les petits jobs avec seulement quelques nœuds de calcul, le coût total est dominé par les coûts fixes. Pour commencer, vous pouvez extrapoler le nombre de nœuds de calcul et la consommation de ressources à partir des résultats du benchmark.

Par exemple, supposons que vous exécutiez un pipeline de type "map-only" en mode "exactement une fois", avec un débit de données d'entrée de 100 Mio/s. Sur la base des résultats du benchmark pour un pipeline de 1 Gio/s, vous pouvez estimer les besoins en ressources comme suit :

  • Facteur de scaling : (100 Mio/s) / (1 Gio/s) = 0,1
  • Nombre de nœuds de calcul prévus : 63 nœuds de calcul × 0,1 = 6,3 nœuds de calcul
  • Nombre prévu d'unités de calcul Streaming Engine par heure : 58 × 0,1 = 5,8 unités par heure

Cette valeur ne doit être utilisée que comme estimation initiale. Le débit et le coût réels peuvent varier considérablement en fonction de facteurs tels que le type de machine, la distribution de la taille des messages, le code utilisateur, le type d'agrégation, le parallélisme des clés et la taille de la fenêtre. Pour en savoir plus, consultez Bonnes pratiques pour optimiser les coûts Dataflow.

Exécuter un pipeline de test

Cette section présente les commandes gcloud dataflow flex-template run qui ont été utilisées pour exécuter le pipeline de mappage uniquement.

Mode "exactement une fois"

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400

Mode "au moins une fois"

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Kafka_to_BigQuery_Flex \
  --enable-streaming-engine \
  --additional-experiments=streaming_mode_at_least_once \
  --parameters \
readBootstrapServerAndTopic="KAFKA_BOOTSTRAP_ADDRESS;KAFKA_TOPIC",\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
writeMode=SINGLE_TABLE_NAME,\
outputTableSpec="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME",\
useBigQueryDLQ=true,\
outputDeadletterTable="PROJECT_ID:BQ_DATASET.BQ_TABLE_NAME_dlq",\
numStorageWriteApiStreams=400,\
useStorageWriteApiAtLeastOnce=true

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche Dataflow
  • PROJECT_ID : ID du projet
  • KAFKA_BOOTSTRAP_ADDRESS : adresse d'amorçage du cluster Apache Kafka
  • KAFKA_TOPIC : nom du sujet Kafka
  • BQ_DATASET : nom de l'ensemble de données BigQuery
  • BQ_TABLE_NAME : nom de la table BigQuery

Générer des données de test

Pour générer des données de test, exécutez le modèle de générateur de données de streaming à l'aide de la commande suivante :

gcloud dataflow flex-template run JOB_NAME \
  --project=PROJECT_ID \
  --template-file-gcs-location=gs://dataflow-templates-us-central1/latest/flex/Streaming_Data_Generator \
  --max-workers=140 \
  --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=1000000,\
sinkType=KAFKA,\
bootstrapServer=KAFKA_BOOTSTRAP_ADDRESS,\
kafkaTopic=KAFKA_TOPIC,\
outputType=JSON

Remplacez les éléments suivants :

  • JOB_NAME : nom de la tâche Dataflow
  • PROJECT_ID : ID du projet
  • SCHEMA_LOCATION : chemin d'accès à un fichier de schéma dans Cloud Storage
  • KAFKA_BOOTSTRAP_ADDRESS : adresse d'amorçage du cluster Apache Kafka
  • KAFKA_TOPIC : nom du sujet Kafka

Le modèle "Générateur de données de streaming" utilise un fichier JSON Data Generator pour définir le schéma du message. Les tests de référence ont utilisé un schéma de message semblable à celui-ci :

{
  "logStreamId": "{{integer(1000001,2000000)}}",
  "message": "{{alphaNumeric(962)}}"
}

Étapes suivantes