Bonnes pratiques pour les pipelines de lots volumineux

Ce document explique comment minimiser l'impact des échecs de tâches pour les grands pipelines de traitement par lot. Les échecs de charges de travail volumineuses ont un impact particulièrement important en raison du temps et de l'argent nécessaires pour les corriger et s'en remettre. Réessayer ces pipelines à partir de zéro en cas d'échec est coûteux en termes de temps et d'argent.

Pour réduire les échecs coûteux des pipelines de traitement par lot, suivez les consignes de cette page. Étant donné que vous ne pouvez pas toujours éviter complètement les éléments et les échecs de pipeline, les techniques fournies se concentrent sur l'augmentation de la résilience, la réduction du coût des échecs et la simplification du débogage et de la compréhension des échecs lorsqu'ils se produisent.

Pour connaître les bonnes pratiques générales concernant les pipelines, consultez Bonnes pratiques pour les pipelines Dataflow.

Effectuer de petits tests pour les grands jobs

Avant d'exécuter un job par lot volumineux, exécutez un ou plusieurs jobs plus petits sur un sous-ensemble de l'ensemble de données. Cette technique peut à la fois fournir une estimation des coûts et aider à identifier les points de défaillance potentiels.

Estimation du coût

L'exécution de tests peut fournir une estimation du coût plancher total de l'exécution du job. En général, le coût du job est calculé comme suit : cost of test job*size(full dataset)/size(test dataset). Selon le pipeline, le coût peut augmenter de manière superlinéaire ou, moins souvent, sous-linéaire. Néanmoins, cette étape fournit souvent une bonne estimation approximative du coût du job. Vous pouvez également essayer différentes tailles d'entrées pour mieux estimer l'évolution de vos coûts. Utilisez ces informations pour décider de poursuivre avec le pipeline existant ou de le réarchitecturer afin de réduire les coûts.

Identifier les points de défaillance

L'exécution de tests peut révéler des bugs, des points de défaillance potentiels ou des problèmes de configuration et d'efficacité potentiels. Vous pouvez également examiner d'autres métriques de pipeline, telles que les suivantes :

  • Si votre pipeline utilise presque toute la mémoire disponible, il peut rencontrer des exceptions de mémoire insuffisante (OOM) en cas de charge plus élevée ou avec des enregistrements exceptionnellement volumineux. Vous devrez peut-être provisionner plus de mémoire pour votre job final afin d'éviter ces erreurs OOM.
  • Si le débit de votre pipeline connaît des baisses, examinez les journaux de votre pipeline pour en déterminer la cause. Vous pouvez trouver un élément bloqué ou une partie de votre ensemble de données dont les performances sont particulièrement médiocres. Vous pouvez traiter ces points de données séparément ou appliquer un délai avant traitement des éléments. Pour en savoir plus, consultez la section Définir un délai d'expiration pour les enregistrements coûteux de ce document.
  • Si votre pipeline est beaucoup moins performant pour une tâche sur Dataflow qu'en local, examinez sa logique pour en comprendre la raison. Par exemple, si vous obtenez le même débit avec huit cœurs sur Dataflow qu'avec un cœur en local, il est possible que le job soit limité par la contention pour une ressource. Si vous constatez que vos performances sont moins bonnes que prévu, envisagez d'utiliser une ou plusieurs des options suivantes :
    • Exécutez davantage de tests avec différentes configurations de machines ou de logiciels.
    • Testez localement avec plusieurs cœurs en même temps.
    • Inspectez votre code pour identifier les éventuels goulots d'étranglement lors du déploiement à grande échelle.

Si votre pipeline comporte des recommandations Dataflow, suivez-les pour améliorer les performances.

Utiliser des files d'attente de lettres mortes pour gérer les données incorrectes inattendues

Les pipelines réussissent souvent sur la plupart des éléments d'entrée, mais échouent sur un petit sous-ensemble de l'entrée. Vous ne remarquerez peut-être pas ce problème lorsque vous exécuterez des tests de petite taille, car ils ne testent qu'un sous-ensemble de l'entrée. Par défaut, Dataflow relance ces tâches ayant échoué quatre fois en mode de traitement par lot et un nombre illimité de fois en mode de traitement par flux. En mode par lot, l'intégralité de votre job échoue une fois la limite de tentatives atteinte. En mode de traitement par flux, elle peut rester indéfiniment bloquée.

Dans de nombreux jobs, vous pouvez exclure ces éléments défaillants du pipeline et effectuer le reste du job à l'aide d'une file d'attente de lettres mortes (file d'attente des messages non traités). La file d'attente des messages non distribués transmet les enregistrements ayant échoué à une sortie distincte PCollection, que vous pouvez gérer séparément de votre sortie principale. Cette configuration vous permet de concevoir une règle pour ces enregistrements. Par exemple, vous pouvez les écrire manuellement dans Pub/Sub, les inspecter et les nettoyer, puis les retraiter.

De nombreuses transformations Apache Beam sont compatibles avec les files d'attente de lettres mortes. En Java, vous pouvez y accéder avec un objet ErrorHandler. En Python, vous pouvez y accéder à l'aide de la méthode with_exception_handling. Certaines transformations ont des méthodes personnalisées pour définir les files d'attente de lettres mortes. Pour en savoir plus, consultez la documentation sur la transformation. Pour en savoir plus, consultez Utiliser des files d'attente de messages non distribués pour la gestion des erreurs.

Pour déterminer si votre job répond aux critères d'une file d'attente de messages non distribués, consultez la section Limites de ce document.

Limites de la file d'attente de lettres mortes

Dans les scénarios suivants, une file d'attente de messages non distribués peut ne pas être utile :

  • Échecs du cycle de vie complet du worker ou de DoFn. Si le traitement échoue pour l'ensemble du nœud de calcul ou du bundle, une file d'attente de messages non distribués ne peut pas détecter l'échec. Par exemple, si votre pipeline rencontre une exception de mémoire insuffisante (OOM), toutes les tâches actives sur la VM échouent et sont relancées, sans rien envoyer à la file d'attente des messages non distribués.
  • Combinaisons ou autres agrégations Si votre pipeline effectue des calculs qui nécessitent que tous les éléments d'entrée soient présents et traités dans le résultat, soyez prudent lorsque vous utilisez une file d'attente de messages non distribués avant cette étape. L'utilisation d'une file d'attente de messages non distribués exclut une partie de vos données d'entrée du résultat. L'ajout d'une file d'attente de lettres mortes peut échanger l'exactitude contre la tolérance aux pannes.
  • Échecs sur le chemin de la file d'attente de lettres mortes. Si un élément échoue lors de son envoi au récepteur de file d'attente de messages non distribués, l'ensemble du pipeline peut échouer. Pour éviter cet échec, gardez la logique de votre file d'attente de lettres mortes aussi simple que possible. Vous pouvez ajouter une étape d'attente (voir wait class) pour vous assurer que votre entrée principale se termine avant l'écriture de vos éléments de file d'attente de messages non distribués. Cette configuration peut réduire les performances et retarder les signaux d'erreur de votre pipeline.
  • Éléments partiellement transformés. Si vous insérez une file d'attente de lettres mortes au milieu de votre pipeline, elle peut générer l'élément partiellement transformé et ne pas avoir accès à l'élément d'origine. Par conséquent, vous ne pouvez pas nettoyer l'élément ni réexécuter le pipeline sur celui-ci. Vous devrez peut-être appliquer une logique différente pour corréler la sortie de la file d'attente des messages non distribués à l'élément d'origine, ou vous devrez peut-être interpréter et traiter l'élément partiellement transformé. Cela peut également entraîner des résultats incohérents. Par exemple, si des éléments sont envoyés dans deux branches d'un pipeline et que chaque branche envoie des éléments à l'origine d'exceptions à une file d'attente de messages non distribués, un seul élément d'entrée peut être envoyé dans l'une ou l'autre des branches, dans les deux ou dans aucune.

Délai d'expiration des enregistrements coûteux

Les pipelines peuvent cesser de répondre lors du traitement d'un petit sous-ensemble d'éléments plus coûteux ou qui atteignent une limite entraînant une absence de réponse, comme un blocage. Pour atténuer ce problème, certaines transformations vous permettent de définir un délai d'expiration et de faire échouer les éléments ayant expiré dans tous les DoFn de code utilisateur qui rencontrent ce problème. Par exemple, vous pouvez utiliser la méthode with_exception_handling de Python. Lorsque vous utilisez des délais d'attente avec une file d'attente de messages non distribués, votre pipeline peut continuer à traiter les éléments sains et à progresser, et vous pouvez retraiter les éléments coûteux séparément. Cette configuration peut entraîner un coût en termes de performances.

Pour déterminer quelles opérations DoFn sont susceptibles d'expirer, exécutez de petits tests avant de lancer votre pipeline complet.

Activer l'autoscaling vertical

Si vous ne savez pas exactement de quelle quantité de mémoire votre job a besoin ou si vous pensez qu'il risque de manquer de mémoire, activez l'autoscaling vertical. Cette fonctionnalité permet d'éviter les échecs de mémoire saturée lorsque les pipelines s'exécutent à grande échelle ou lorsqu'ils rencontrent des éléments exceptionnellement volumineux.

Étant donné que l'autoscaling vertical peut augmenter le coût de votre job et n'empêche pas tous les échecs de mémoire insuffisante, vous devez toujours résoudre les problèmes de consommation excessive de mémoire. L'autoscaling vertical nécessite également Dataflow Prime, qui présente des limites supplémentaires et un modèle de facturation différent.

Utiliser l'exécution spéculative pour éviter les tâches lentes

Pour les pipelines par lot, vous pouvez activer l'exécution spéculative, une fonctionnalité permettant d'atténuer l'impact des tâches lentes ou bloquées. Ces tâches lentes ou bloquées sont également appelées tâches lentes. Cette fonctionnalité lance des exécutions redondantes ou de sauvegarde pour les tâches qui prennent trop de temps. La première tâche à se terminer est utilisée, et l'autre est annulée, ce qui peut améliorer le temps d'exécution global de votre pipeline.

L'exécution spéculative peut aider les pipelines à se terminer plus rapidement en fournissant un chemin d'exécution alternatif pour les éléments de travail qui subissent des retards en raison de la lenteur des machines de nœud de calcul ou d'autres problèmes temporaires tels que des bugs non déterministes, une limitation des ressources ou des problèmes de connectivité.

Limites et points à noter

Avant d'activer l'exécution spéculative, tenez compte des points suivants :

  • Pipelines de traitement en flux continu : l'exécution spéculative n'est pas compatible avec les pipelines de traitement en flux continu.
  • Variation potentielle des coûts : il est difficile d'estimer l'impact de cette fonctionnalité sur les coûts, car il est difficile de prédire les tâches lentes et le provisionnement des tâches de sauvegarde. Par exemple, bien qu'un élément de travail de sauvegarde consomme des ressources supplémentaires, ce qui peut augmenter les coûts, son achèvement plus rapide peut en revanche entraîner des économies de ressources et une réduction des coûts. Dans les deux cas, l'impact global devrait être minime.
  • Éléments de travail de longue durée cohérents : l'exécution spéculative peut ne pas être d'une grande aide pour les éléments de travail de longue durée cohérents tels que les touches de raccourci, car le problème sous-jacent à l'origine de la lenteur persiste.

Pour en savoir plus sur les retardataires dans les jobs par lot, consultez Résoudre les problèmes liés aux retardataires dans les jobs par lot.

Activer l'exécution spéculative

Pour activer l'exécution spéculative, utilisez l'option de service Dataflow map_task_backup_mode. Deux modes sont disponibles :

Java

  • --dataflowServiceOptions=map_task_backup_mode=ON
  • --dataflowServiceOptions=map_task_backup_mode=CAUTIOUS

Python / Go

  • --dataflow_service_options=map_task_backup_mode=ON
  • --dataflow_service_options=map_task_backup_mode=CAUTIOUS

En mode ON, une tâche de sauvegarde est planifiée si la durée d'exécution prévue de la tâche d'origine est environ 20 % plus longue que celle d'une nouvelle tâche.

En mode CAUTIOUS, une tâche de sauvegarde est planifiée si la durée d'exécution prévue de la tâche d'origine est environ 70 % plus longue que celle d'une nouvelle tâche.

Pour vérifier que l'exécution spéculative est activée, consultez les messages du journal. Recherchez les entrées indiquant que des tâches de sauvegarde ont été lancées. Cela confirme que l'exécution spéculative est déclenchée. Pour afficher ces journaux, accédez au panneau Journaux des jobs de votre pipeline (Jobs > choisissez votre job > section Logs > Journaux des jobs). Le message du journal se présente comme suit :

Backup issued in step STEP_NAME. ADDITIONAL_INFORMATION.

Solutions pour les pipelines sujets aux échecs

Certains pipelines sont particulièrement sujets aux erreurs. Bien qu'il soit préférable de résoudre la source de ces erreurs, vous pouvez envisager les options suivantes pour réduire le coût des échecs.

Matérialiser les résultats intermédiaires

Les pipelines peuvent comporter une ou plusieurs transformations particulièrement coûteuses qui dominent le temps d'exécution du pipeline. Les échecs de pipeline après cette transformation peuvent être particulièrement préjudiciables, car tout le travail déjà effectué est perdu. Pour éviter ce scénario, envisagez d'écrire les PCollections intermédiaires générés par des étapes coûteuses dans un récepteur tel que Cloud Storage. Cette configuration réduit le coût d'un échec. Vous devez évaluer cet avantage par rapport au coût de l'écriture supplémentaire. Vous pouvez utiliser ce résultat matérialisé de l'une des manières suivantes :

  1. Divisez votre pipeline d'origine en deux : l'un écrit le résultat intermédiaire et l'autre le lit.
  2. En cas d'échec du pipeline uniquement, lisez et aplatissez les résultats de votre source d'origine et de votre collection intermédiaire matérialisée.

Pour vous assurer que ces matérialisations sont écrites avant tout traitement ultérieur, ajoutez une étape d'attente (voir wait class) avant toute étape de traitement ultérieure.