Prácticas recomendadas para la transformación RunInference

Cuando uses Dataflow para la inferencia de aprendizaje automático, te recomendamos que utilices la transformación RunInference. Usar esta transformación ofrece varias ventajas, entre las que se incluyen las siguientes:

  • Gestión inteligente de la memoria del modelo optimizada para un trabajador de Dataflow al realizar inferencias locales.
  • El procesamiento por lotes dinámico, que usa las características de la canalización y las restricciones definidas por el usuario para optimizar el rendimiento.
  • Funciones de backend de Dataflow basadas en aprendizaje automático que pueden proporcionar un mejor rendimiento y latencia.
  • Mecanismos inteligentes de retardo y escalado automático cuando se alcanzan las cuotas de inferencia remota.
  • Métricas y funciones operativas listas para producción.

Al usar RunInference, debes tener en cuenta varios aspectos:

Gestión de la memoria

Cuando cargas un modelo de aprendizaje automático mediano o grande, es posible que tu máquina se quede sin memoria. Dataflow proporciona herramientas para evitar errores de falta de memoria (OOM) al cargar modelos de aprendizaje automático. Usa la siguiente tabla para determinar el enfoque adecuado para tu situación.

Situación Solución
Los modelos son lo suficientemente pequeños como para caber en la memoria. Usa la transformación RunInference sin ninguna configuración adicional. La transformación RunInference comparte los modelos entre los hilos. Si puedes ajustar un modelo por núcleo de CPU en tu máquina, tu canal puede usar la configuración predeterminada.
Varios modelos entrenados de forma diferente realizan la misma tarea. Usa claves por modelo. Para obtener más información, consulta Ejecutar inferencias de aprendizaje automático con varios modelos entrenados de forma diferente.
Un modelo se carga en la memoria y todos los procesos comparten este modelo.

Usa el parámetro large_model. Para obtener más información, consulta Ejecutar inferencias de aprendizaje automático con varios modelos entrenados de forma diferente.

Si estás creando un controlador de modelo personalizado, en lugar de usar el parámetro large_model, anula el parámetro share_model_across_processes.

Debes configurar el número exacto de modelos cargados en tu máquina.

Para controlar exactamente cuántos modelos se cargan, usa el parámetro model_copies.

Si vas a crear un controlador de modelo personalizado, anula el parámetro model_copies.

Para obtener más información sobre la gestión de memoria con Dataflow, consulta el artículo Solucionar errores de falta de memoria de Dataflow.

Procesamiento por lotes

Hay muchas formas de hacer el procesamiento por lotes en Beam, pero, cuando se realiza una inferencia, te recomendamos que dejes que la transformación RunInference se encargue del procesamiento por lotes. Si tu modelo funciona mejor con un tamaño de lote específico, considera la posibilidad de restringir los parámetros de tamaño de lote objetivo de RunInference. La mayoría de los controladores de modelos exponen los tamaños de lote máximo y mínimo como parámetros. Por ejemplo, para controlar el tamaño del lote que se introduce en una canalización de Hugging Face, puedes definir el siguiente controlador de modelo:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

La transformación RunInference siempre respeta el tamaño máximo del lote. El tamaño mínimo de lote es un objetivo, pero no se garantiza que se respete en todos los casos. Por ejemplo, consulta la sección Envío por lotes basado en paquetes que aparece a continuación.

Procesamiento por lotes basado en paquetes

Dataflow transfiere datos a las transformaciones en paquetes. El tamaño de estos paquetes puede variar en función de las heurísticas definidas por Dataflow. Por lo general, los paquetes de las canalizaciones por lotes son bastante grandes (O(100s) elementos), mientras que los de las canalizaciones de streaming pueden ser bastante pequeños (incluido el tamaño 1).

De forma predeterminada, RunInference genera lotes a partir de cada paquete y no genera lotes entre paquetes. Esto significa que, si tienes un tamaño de lote mínimo de 8, pero solo quedan 3 elementos en tu paquete, RunInference usará un tamaño de lote de 3. La mayoría de los controladores de modelos exponen un parámetro max_batch_duration_secs que te permite anular este comportamiento. Si se define max_batch_duration_secs, RunInference procesa por lotes en todos los paquetes. Si la transformación no puede alcanzar el tamaño de lote objetivo con un solo paquete, espera un máximo de max_batch_duration_secs antes de generar un lote. Por ejemplo, para habilitar el procesamiento por lotes entre paquetes al usar una canalización de HuggingFace, puedes definir el siguiente controlador de modelo:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

Esta función es útil si tienes tamaños de lote muy pequeños en tu canalización. De lo contrario, el coste de sincronización para agrupar en lotes los paquetes no suele merecer la pena, ya que puede provocar un shuffle caro.

Gestionar fallos

Gestionar los errores es una parte importante de cualquier flujo de trabajo de producción. Dataflow procesa los elementos en paquetes arbitrarios y vuelve a intentar procesar el paquete completo si se produce un error en algún elemento de ese paquete. Si no aplicas una gestión de errores adicional, Dataflow vuelve a intentar enviar los paquetes que incluyen un elemento fallido cuatro veces cuando se ejecuta en modo de lote. El flujo de procesamiento falla por completo cuando un solo paquete falla cuatro veces. Cuando se ejecuta en modo de streaming, Dataflow vuelve a intentar procesar un paquete que incluye un elemento fallido indefinidamente, lo que puede provocar que tu flujo de procesamiento se detenga permanentemente.

RunInference proporciona un mecanismo de gestión de errores integrado con su with_exception_handlingfunción. Cuando aplicas esta función, se dirigen todos los errores a un canal de errores independiente PCollection junto con sus mensajes de error. De esta forma, podrás volver a procesarlos. Si asocias operaciones de preprocesamiento o postprocesamiento con tu controlador de modelo, RunInference también las dirigirá a la colección de errores. Por ejemplo, para recoger todos los errores de un controlador de modelo con operaciones de preprocesamiento y postprocesamiento, utiliza la siguiente lógica:

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

Tiempos de espera

Cuando usas la función with_exception_handling de RunInference, también puedes definir un tiempo de espera para cada operación, que se cuenta por lote. De esta forma, puedes evitar que una sola inferencia bloqueada haga que toda la canalización no responda. Si se agota el tiempo de espera, el registro correspondiente se dirige al error PCollection, se limpia y se vuelve a crear todo el estado del modelo, y la ejecución normal continúa.

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

A partir de Beam 2.68.0, también puedes especificar un tiempo de espera mediante la opción de flujo de procesamiento --element_processing_timeout_minutes. En este caso, un tiempo de espera provoca que se vuelva a intentar completar un elemento de trabajo fallido hasta que se complete correctamente, en lugar de enrutar la inferencia fallida a una cola de mensajes fallidos.

Trabajar con Accelerators

Cuando se usan aceleradores, muchos controladores de modelos tienen configuraciones específicas de aceleradores que se pueden habilitar. Por ejemplo, si usas una GPU y las canalizaciones de Hugging Face, te recomendamos que asignes el valor "GPU" al parámetro device:

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

También te recomendamos que empieces con una sola instancia de VM y ejecutes tu canalización de forma local. Para ello, sigue los pasos que se describen en la guía para solucionar problemas de la GPU. De esta forma, se puede reducir significativamente el tiempo necesario para poner en marcha una canalización. Este enfoque también puede ayudarte a comprender mejor el rendimiento de tu trabajo.

Para obtener más información sobre el uso de aceleradores en Dataflow, consulta la documentación de Dataflow sobre GPUs y TPUs.

Gestión de dependencias

Las canalizaciones de aprendizaje automático suelen incluir dependencias grandes e importantes, como PyTorch o TensorFlow. Para gestionar estas dependencias, te recomendamos que uses contenedores personalizados cuando implementes tu trabajo en producción. De esta forma, el trabajo se ejecuta en un entorno estable en varias ejecuciones y se simplifica la depuración.

Para obtener más información sobre la gestión de dependencias, consulta la página Gestión de dependencias de Python de Beam.

Siguientes pasos