Les pipelines sont exécutés sur des clusters de machines. Ils atteignent un débit élevé en divisant le travail à effectuer, puis en l'exécutant en parallèle sur les différents exécuteurs répartis sur le cluster. En général, plus le nombre de divisions (ou partitions) est élevé, plus le pipeline peut s'exécuter rapidement. Le niveau de parallélisme de votre pipeline est déterminé par les sources et les étapes de brassage de celui-ci.
Sources
Au début de chaque exécution de pipeline, chaque source de votre pipeline calcule les données à lire et comment elles peuvent être divisées en divisions. Prenons l'exemple d'un pipeline de base qui lit des données à partir de Cloud Storage, effectue des transformations Wrangler, puis écrit à nouveau dans Cloud Storage.

Lorsque le pipeline démarre, la source Cloud Storage examine les fichiers d'entrée et les divise en divisions en fonction de leur taille. Par exemple, un fichier d'un gigaoctet peut être divisé en 100 divisions de 10 Mo chacune. Chaque exécuteur lit les données de cette division, exécute les transformations Wrangler, puis écrit la sortie dans un fichier part.
Si votre pipeline s'exécute lentement, l'une des premières choses à vérifier est si vos sources créent suffisamment de divisions pour profiter pleinement du parallélisme. Par exemple, certains types de compression rendent les fichiers en texte brut non divisibles. Si vous lisez des fichiers compressés au format gzip, vous remarquerez peut-être que votre pipeline s'exécute beaucoup plus lentement que si vous lisiez des fichiers non compressés ou des fichiers compressés avec BZIP (qui est divisible). De même, si vous utilisez la source de base de données et que vous l'avez configurée pour n'utiliser qu'une seule division, elle s'exécute beaucoup plus lentement que si vous la configurez pour utiliser davantage de divisions.
Brassages
Certains types de plug-ins entraînent le brassage des données sur le cluster. Cela se produit lorsque les enregistrements traités par un exécuteur doivent être envoyés à un autre exécuteur pour effectuer le calcul. Les brassages sont des opérations coûteuses, car elles impliquent de nombreuses E/S. Les plug-ins qui entraînent le brassage des données s'affichent tous dans la section Analytics de Pipeline Studio. Il s'agit, par exemple, des plug-ins Group By, Deduplicate, Distinct et Joiner. Supposons qu'une étape Group By soit ajoutée au pipeline dans l'exemple précédent.
Supposons également que les données lues représentent des achats effectués dans une épicerie.
Chaque enregistrement contient un champ item et un champ num_purchased. À l'étape Group By, nous configurons le pipeline pour regrouper les enregistrements dans le champ item et calculer la somme du champ num_purchased.
Lorsque le pipeline s'exécute, les fichiers d'entrée sont divisés comme décrit précédemment. Ensuite, chaque enregistrement est brassé sur le cluster de sorte que chaque enregistrement ayant le même élément appartienne au même exécuteur.
Comme illustré dans l'exemple précédent, les enregistrements des achats de pommes étaient initialement répartis sur plusieurs exécuteurs. Pour effectuer l'agrégation, tous ces enregistrements devaient être envoyés sur le cluster au même exécuteur.
La plupart des plug-ins qui nécessitent un brassage vous permettent de spécifier le nombre de partitions à utiliser lors du brassage des données. Cela contrôle le nombre d'exécuteurs utilisés pour traiter les données brassées.
Dans l'exemple précédent, si le nombre de partitions est défini sur 2, chaque exécuteur calcule les agrégats pour deux éléments au lieu d'un.
Notez qu'il est possible de réduire le parallélisme de votre pipeline après cette étape. Par exemple, considérons la vue logique du pipeline :
Si la source divise les données sur 500 partitions, mais que le brassage Group By utilise 200 partitions, le niveau maximal de parallélisme après le Group By passe de 500 à 200. Au lieu de 500 fichiers de parties différents écrits dans Cloud Storage, vous n'en avez que 200.
Choisir des partitions
Si le nombre de partitions est trop faible, vous n'utiliserez pas la pleine capacité de votre cluster pour paralléliser autant de travail que possible. Si vous définissez un nombre de partitions trop élevé, vous augmentez la quantité de frais généraux inutiles. En général, il est préférable d'utiliser trop de partitions que trop peu. Les frais généraux supplémentaires sont un problème si votre pipeline prend quelques minutes à s'exécuter et que vous essayez de gagner quelques minutes. Si votre pipeline prend des heures à s'exécuter, les frais généraux ne sont généralement pas un problème.
Une façon utile, mais trop simpliste, de déterminer le nombre de partitions à utiliser consiste à le définir sur max(cluster CPUs, input records / 500,000). En d'autres termes, prenez le nombre d'enregistrements d'entrée et divisez-le par 500 000. Si ce nombre est supérieur au nombre de processeurs du cluster, utilisez-le pour le nombre de partitions.
Sinon, utilisez le nombre de processeurs du cluster. Par exemple, si votre cluster comporte 100 processeurs et que l'étape de brassage devrait comporter 100 millions d'enregistrements d'entrée, utilisez 200 partitions.
Une réponse plus complète est que les brassages sont plus performants lorsque les données de brassage intermédiaires de chaque partition peuvent tenir complètement dans la mémoire d'un exécuteur, de sorte que rien n'a besoin d'être déversé sur le disque. Spark réserve un peu moins de 30 % de la mémoire d'un exécuteur pour stocker les données de brassage. Le nombre exact est (mémoire totale - 300 Mo) * 30 %. Si nous supposons que chaque exécuteur est configuré pour utiliser 2 Go de mémoire, cela signifie que chaque partition ne doit pas contenir plus de (2 Go - 300 Mo) * 30 % = environ 500 Mo d'enregistrements. Si nous supposons que chaque enregistrement est compressé à 1 Ko, cela signifie que (500 Mo / partition) / (1 Ko/enregistrement) = 500 000 enregistrements par partition. Si vos exécuteurs utilisent plus de mémoire ou si vos enregistrements sont plus petits, vous pouvez ajuster ce nombre en conséquence.
Décalage de données
Notez que dans l'exemple précédent, les achats de différents articles étaient répartis de manière égale. Autrement dit, il y avait trois achats pour les pommes, les bananes, les carottes et les œufs. Le brassage sur une clé répartie de manière égale est le type de brassage le plus performant, mais de nombreux ensembles de données ne possèdent pas cette propriété. En reprenant l'exemple précédent d'achat dans une épicerie, vous vous attendez à avoir beaucoup plus d'achats d'œufs que de cartes de mariage. Lorsque quelques clés de brassage sont beaucoup plus courantes que d'autres, vous êtes confronté à des données décalées. Les données décalées peuvent être beaucoup moins performantes que les données non décalées, car une quantité disproportionnée de travail est effectuée par une poignée d'exécuteurs. Cela entraîne une taille beaucoup plus importante d'un petit sous-ensemble de partitions que toutes les autres.
Dans cet exemple, il y a cinq fois plus d'achats d'œufs que d'achats de cartes, ce qui signifie que l'agrégat d'œufs prend environ cinq fois plus de temps à calculer. Cela n'a pas beaucoup d'importance lorsque vous ne traitez que 10 enregistrements au lieu de deux, mais cela fait une grande différence lorsque vous traitez cinq milliards d'enregistrements au lieu d'un milliard. Lorsque vous avez un décalage de données, le nombre de partitions utilisées dans un brassage n'a pas d'impact important sur les performances du pipeline.
Vous pouvez reconnaître le décalage des données en examinant le graphique des enregistrements de sortie au fil du temps. Si l'étape génère des enregistrements à un rythme beaucoup plus élevé au début de l'exécution du pipeline, puis ralentit soudainement, cela peut signifier que vos données sont décalées.
Vous pouvez également reconnaître le décalage des données en examinant l'utilisation de la mémoire du cluster au fil du temps. Si votre cluster est à pleine capacité pendant un certain temps, mais que l'utilisation de la mémoire est soudainement faible pendant une période donnée, cela indique également que vous êtes confronté à un décalage des données.
Les données décalées ont un impact plus important sur les performances lorsqu'une jointure est effectuée. Il existe quelques techniques qui peuvent être utilisées pour améliorer les performances des jointures décalées. Pour en savoir plus, consultez la section
Traitement en parallèle des opérations JOIN.
Ajustement adaptatif pour l'exécution
Pour ajuster l'exécution de manière adaptative, spécifiez la plage de partitions à utiliser, et non le numéro de partition exact. Le numéro de partition exact, même s'il est défini dans la configuration du pipeline, est ignoré lorsque l'exécution adaptative est activée.
Si vous utilisez un service géré éphémère pour un cluster Apache Spark, Cloud Data Fusion définit automatiquement la configuration appropriée. Toutefois, pour les services gérés statiques pour les clusters Apache Spark ou Hadoop, les deux paramètres de configuration suivants peuvent être définis :
spark.default.parallelism: définissez-le sur le nombre total de cœurs virtuels disponibles dans le cluster. Cela garantit que votre cluster n'est pas sous-chargé et définit la limite inférieure du nombre de partitions.spark.sql.adaptive.coalescePartitions.initialPartitionNum: définissez-le sur 32 fois le nombre de cœurs virtuels disponibles dans le cluster. Cela définit la limite supérieure du nombre de partitions.Spark.sql.adaptive.enabled: pour activer les optimisations, définissez cette valeur surtrue. Le service géré pour Apache Spark le définit automatiquement, mais si vous utilisez des clusters Hadoop génériques, vous devez vous assurer qu'il est activé .
Ces paramètres peuvent être définis dans la configuration du moteur d'un pipeline spécifique ou dans les propriétés du cluster d'un service géré statique pour un cluster Apache Spark.
Étape suivante
- Découvrez le traitement en parallèle des opérations
JOIN.