Bonnes pratiques concernant la transformation RunInference

Lorsque vous utilisez Dataflow pour l'inférence ML, nous vous recommandons d'utiliser la transformation RunInference. L'utilisation de cette transformation présente plusieurs avantages, y compris les suivants :

  • Gestion intelligente de la mémoire du modèle optimisée pour un nœud de calcul Dataflow lors de l'inférence locale.
  • Le traitement par lots dynamique, qui utilise les caractéristiques du pipeline et les contraintes définies par l'utilisateur pour optimiser les performances.
  • Fonctionnalités de backend Dataflow compatibles avec le ML, qui peuvent améliorer le débit et la latence.
  • Mécanismes intelligents de backoff et d'autoscaling en cas de dépassement des quotas d'inférence à distance.
  • Métriques et fonctionnalités opérationnelles prêtes pour la production.

Lorsque vous utilisez RunInference, vous devez tenir compte de plusieurs points :

Gestion de la mémoire

Lorsque vous chargez un modèle de ML de taille moyenne ou grande, votre machine peut manquer de mémoire. Dataflow fournit des outils pour éviter les erreurs de mémoire saturée (OOM, Out Of Memory) lors du chargement de modèles de ML. Utilisez le tableau suivant pour déterminer l'approche appropriée à votre scénario.

Scénario Solution
Les modèles sont suffisamment petits pour tenir dans la mémoire. Utilisez la transformation RunInference sans aucune configuration supplémentaire. La transformation RunInference partage les modèles entre les threads. Si vous pouvez adapter un modèle par cœur de processeur sur votre machine, votre pipeline peut utiliser la configuration par défaut.
Plusieurs modèles entraînés différemment effectuent la même tâche. Utilisez des clés par modèle. Pour en savoir plus, consultez Exécuter l'inférence ML avec plusieurs modèles entraînés différemment.
Un modèle est chargé en mémoire et tous les processus le partagent.

Utilisez le paramètre large_model. Pour en savoir plus, consultez Exécuter l'inférence ML avec plusieurs modèles entraînés différemment.

Si vous créez un gestionnaire de modèle personnalisé, au lieu d'utiliser le paramètre large_model, remplacez le paramètre share_model_across_processes.

Vous devez configurer le nombre exact de modèles chargés sur votre machine.

Pour contrôler précisément le nombre de modèles chargés, utilisez le paramètre model_copies.

Si vous créez un gestionnaire de modèles personnalisé, remplacez le paramètre model_copies.

Pour en savoir plus sur la gestion de la mémoire avec Dataflow, consultez Résoudre les erreurs Dataflow de mémoire insuffisante.

Traitement par lot

Il existe de nombreuses façons de regrouper les données par lot dans Beam, mais lorsque vous effectuez une inférence, nous vous recommandons de laisser la transformation RunInference gérer le regroupement par lot. Si votre modèle fonctionne mieux avec une taille de lot spécifique, envisagez de contraindre les paramètres de taille de lot cible de RunInference. La plupart des gestionnaires de modèles exposent les tailles de lot maximales et minimales en tant que paramètres. Par exemple, pour contrôler la taille du lot transmis à un pipeline HuggingFace, vous pouvez définir le gestionnaire de modèle suivant :

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

La transformation RunInference respecte toujours la taille maximale du lot. La taille de lot minimale est un objectif, mais il n'est pas garanti qu'elle soit respectée dans tous les cas. Par exemple, consultez Regroupement par lot basé sur les bundles dans la section suivante.

Traitement par lots basé sur les bundles

Dataflow transmet les données aux transformations dans des bundles. La taille de ces bundles peut varier en fonction des heuristiques définies par Dataflow. En règle générale, les bundles des pipelines par lots sont assez volumineux (O(100 éléments)), tandis que ceux des pipelines de streaming peuvent être assez petits (y compris de taille 1).

Par défaut, RunInference génère des lots à partir de chaque bundle et ne regroupe pas les bundles. Cela signifie que si vous avez une taille de lot minimale de 8, mais qu'il ne reste que trois éléments dans votre bundle, RunInference utilise une taille de lot de 3. La plupart des gestionnaires de modèles exposent un paramètre max_batch_duration_secs qui vous permet de remplacer ce comportement. Si max_batch_duration_secs est défini, RunInference regroupe les lots dans les bundles. Si la transformation ne peut pas atteindre la taille de lot cible avec un seul bundle, elle attend au maximum max_batch_duration_secs avant de générer un lot. Par exemple, pour activer le traitement par lot multi-bundle lorsque vous utilisez un pipeline HuggingFace, vous pouvez définir le gestionnaire de modèle suivant :

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

Cette fonctionnalité est utile si vous rencontrez des tailles de lot très faibles dans votre pipeline. Sinon, le coût de synchronisation pour le traitement par lot dans les bundles ne vaut généralement pas la peine d'être utilisé, car il peut entraîner un mélange coûteux.

Gérer les échecs

La gestion des erreurs est un élément important de tout pipeline de production. Dataflow traite les éléments sous forme d'ensembles arbitraires et relance l'ensemble complet si une erreur se produit pour l'un des éléments qu'il contient. Si vous n'appliquez pas de gestion des exceptions supplémentaire, Dataflow relance les ensembles qui incluent un élément défaillant quatre fois lorsqu'il s'exécute en mode de traitement par lots. Le pipeline échoue complètement lorsqu'un ensemble échoue quatre fois. Lors de l'exécution en mode de traitement en flux continu, Dataflow relance indéfiniment un bundle comprenant un élément défaillant, ce qui risque de bloquer votre pipeline de manière permanente.

RunInference fournit un mécanisme de gestion des exceptions intégré avec sa fonction with_exception_handling. Lorsque vous appliquez cette fonction, elle redirige tous les échecs vers un PCollection d'échec distinct, ainsi que leurs messages d'erreur. Cela vous permet de les retraiter. Si vous associez des opérations de prétraitement ou de post-traitement à votre gestionnaire de modèles, RunInference les redirige également vers la collection d'échecs. Par exemple, pour collecter tous les échecs d'un gestionnaire de modèles avec des opérations de prétraitement et de post-traitement, utilisez la logique suivante :

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

Délais avant expiration

Lorsque vous utilisez la fonctionnalité with_exception_handling de RunInference, vous pouvez également définir un délai avant expiration pour chaque opération, qui est comptabilisé par lot. Cela vous permet d'éviter qu'une seule inférence bloquée ne rende l'ensemble du pipeline non réactif. En cas de délai dépassé, l'enregistrement concerné est routé vers l'échec PCollection, l'état du modèle est entièrement nettoyé et recréé, et l'exécution normale se poursuit.

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

À partir de Beam 2.68.0, vous pouvez également spécifier un délai d'attente à l'aide de l'option de pipeline --element_processing_timeout_minutes. Dans ce cas, un délai avant expiration entraîne une nouvelle tentative d'exécution d'un élément de travail ayant échoué jusqu'à ce qu'il réussisse, au lieu de router l'inférence ayant échoué vers une file d'attente des messages non distribués.

Utiliser des accélérateurs

Lorsque vous utilisez des accélérateurs, de nombreux gestionnaires de modèles disposent de configurations spécifiques aux accélérateurs que vous pouvez activer. Par exemple, lorsque vous utilisez un GPU et des pipelines Hugging Face, nous vous recommandons de définir le paramètre device sur GPU :

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

Nous vous recommandons également de commencer par une seule instance de VM et d'y exécuter votre pipeline en local. Pour ce faire, suivez la procédure décrite dans le guide de dépannage des GPU. Cela peut réduire considérablement le temps nécessaire à l'exécution d'un pipeline. Cette approche peut également vous aider à mieux comprendre les performances de votre tâche.

Pour en savoir plus sur l'utilisation des accélérateurs dans Dataflow, consultez la documentation Dataflow sur les GPU et les TPU.

Gestion des dépendances

Les pipelines de ML incluent souvent des dépendances importantes et volumineuses, telles que PyTorch ou TensorFlow. Pour gérer ces dépendances, nous vous recommandons d'utiliser des conteneurs personnalisés lorsque vous déployez votre job en production. Cela garantit que votre job s'exécute dans un environnement stable sur plusieurs exécutions et simplifie le débogage.

Pour en savoir plus sur la gestion des dépendances, consultez la page Gestion des dépendances Python de Beam.

Étapes suivantes