Práticas recomendadas para a transformação RunInference

Ao usar o Dataflow para inferência de ML, recomendamos usar a transformação RunInference. Usar essa transformação traz vários benefícios, incluindo:

  • Gerenciamento inteligente de memória do modelo otimizado para um worker do Dataflow ao realizar inferência local.
  • Agrupamento dinâmico que usa características do pipeline e restrições definidas pelo usuário para otimizar a performance.
  • Recursos de back-end do Dataflow com reconhecimento de ML que podem oferecer melhor capacidade de processamento e latência.
  • Mecanismos inteligentes de espera e autoescalonamento ao encontrar cotas de inferência remota.
  • Métricas prontas para Production e recursos operacionais.

Ao usar RunInference, considere o seguinte:

Gerenciamento de memória

Ao carregar um modelo de ML médio ou grande, sua máquina pode ficar sem memória. O Dataflow oferece ferramentas para ajudar a evitar erros de falta de memória (OOM) ao carregar modelos de ML. Use a tabela a seguir para determinar a abordagem adequada ao seu cenário.

Cenário Solução
Os modelos são pequenos o suficiente para caber na memória. Use a transformação RunInference sem outras configurações. A transformação RunInference compartilha os modelos entre threads. Se for possível ajustar um modelo por núcleo de CPU na máquina, o pipeline poderá usar a configuração padrão.
Vários modelos treinados de maneira diferente estão realizando a mesma tarefa. Use chaves por modelo. Para mais informações, consulte Executar inferência de ML com vários modelos treinados de maneira diferente.
Um modelo é carregado na memória, e todos os processos compartilham esse modelo.

Use o parâmetro large_model. Para mais informações, consulte Executar inferência de ML com vários modelos treinados de maneira diferente.

Se você estiver criando um gerenciador de modelos personalizado, em vez de usar o parâmetro large_model, substitua o parâmetro share_model_across_processes.

Você precisa configurar o número exato de modelos carregados na sua máquina.

Para controlar exatamente quantos modelos são carregados, use o parâmetro model_copies.

Se você estiver criando um gerenciador de modelos personalizado, substitua o parâmetro model_copies.

Para mais informações sobre gerenciamento de memória com o Dataflow, consulte Resolver problemas de falta de memória no Dataflow.

Lote

Há muitas maneiras de fazer o processamento em lote no Beam, mas ao realizar inferências, recomendamos que você deixe a transformação RunInference cuidar disso. Se o modelo tiver o melhor desempenho com um tamanho de lote específico, considere restringir os parâmetros de tamanho de lote de destino de RunInference. A maioria dos manipuladores de modelos expõe os tamanhos máximo e mínimo de lote como parâmetros. Por exemplo, para controlar o tamanho do lote inserido em um pipeline do HuggingFace, você pode definir o seguinte manipulador de modelo:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)

A transformação RunInference sempre respeita o tamanho máximo do lote. O tamanho mínimo do lote é uma meta, mas não há garantia de que será respeitado em todos os casos. Por exemplo, consulte Lotes baseados em pacotes na seção a seguir.

Lote com base em pacote

O Dataflow transmite dados para transformações em pacotes. Esses pacotes podem variar de tamanho dependendo das heurísticas definidas pelo Dataflow. Normalmente, os pacotes em pipelines em lote são bem grandes (O(100s) elementos), enquanto nos pipelines de streaming eles podem ser bem pequenos (incluindo tamanho 1).

Por padrão, o RunInference gera lotes de cada pacote e não cria lotes entre pacotes. Isso significa que, se você tiver um tamanho mínimo de lote de 8, mas apenas 3 elementos restantes no pacote, RunInference usará um tamanho de lote de 3. A maioria dos manipuladores de modelos expõe um parâmetro max_batch_duration_secs que permite substituir esse comportamento. Se max_batch_duration_secs estiver definido, RunInference será agrupado em lotes em vários pacotes. Se a transformação não conseguir atingir o tamanho do lote desejado com um único pacote, ela vai esperar no máximo max_batch_duration_secs antes de gerar um lote. Por exemplo, para ativar o processamento em lote entre pacotes ao usar um pipeline do HuggingFace, defina o seguinte manipulador de modelo:

mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)

Esse recurso é útil se você tiver tamanhos de lote muito pequenos no seu pipeline. Caso contrário, o custo de sincronização para processar em lote em vários pacotes geralmente não vale a pena usar, porque pode causar um embaralhamento caro.

Como corrigir falhas

O tratamento de erros é uma parte importante de qualquer pipeline de produção. O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo se um erro ocorrer em qualquer elemento dele. Se você não aplicar um tratamento de erros adicional, o Dataflow vai repetir os pacotes que incluem um item com falha quatro vezes ao executar no modo em lote. O pipeline falha completamente quando um único pacote falha quatro vezes. Em execuções no modo de streaming, o Dataflow repete um pacote que inclui um item com falha indefinidamente, o que pode causar a parada permanente do pipeline.

O RunInference oferece um mecanismo de tratamento de erros integrado com a função with_exception_handling. Quando você aplica essa função, ela encaminha todas as falhas para um PCollection separado, junto com as mensagens de erro. Isso permite que você os reprocesse. Se você associar operações de pré-processamento ou pós-processamento ao manipulador de modelo, o RunInference também vai encaminhar essas operações para a coleta de falhas. Por exemplo, para reunir todas as falhas de um gerenciador de modelos com operações de pré-processamento e pós-processamento, use a seguinte lógica:

main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()

# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)

# handles failed inferences
other.failed_inferences | beam.Map(logging.info)

# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)

Tempo limite

Ao usar o recurso with_exception_handling do RunInference, você também pode definir um tempo limite para cada operação, que é contado por lote. Isso evita que uma única inferência travada deixe todo o pipeline sem resposta. Se um tempo limite ocorrer, o registro com tempo esgotado será encaminhado para a falha PCollection, todo o estado do modelo será limpo e recriado, e a execução normal continuará.

# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)

A partir do Beam 2.68.0, também é possível especificar um tempo limite usando a opção de pipeline --element_processing_timeout_minutes. Nesse caso, um tempo limite faz com que um item de trabalho com falha seja repetido até ser concluído, em vez de rotear a inferência com falha para uma fila de mensagens não entregues.

Como trabalhar com aceleradores

Ao usar aceleradores, muitos gerenciadores de modelos têm configurações específicas que podem ser ativadas. Por exemplo, ao usar uma GPU e pipelines do Hugging Face, recomendamos definir o parâmetro device como GPU:

mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')

Também recomendamos que você comece com uma única instância de VM e execute o pipeline localmente nela. Para fazer isso, siga as etapas descritas no guia de solução de problemas de GPU. Isso pode reduzir significativamente o tempo necessário para executar um pipeline. Essa abordagem também pode ajudar você a entender melhor a performance do seu trabalho.

Para mais informações sobre o uso de aceleradores no Dataflow, consulte a documentação do Dataflow sobre GPUs e TPUs.

Gerenciamento de dependências

Os pipelines de ML geralmente incluem dependências grandes e importantes, como PyTorch ou TensorFlow. Para gerenciar essas dependências, recomendamos usar contêineres personalizados ao implantar seu job em produção. Isso garante que o job seja executado em um ambiente estável em várias execuções e simplifica a depuração.

Para mais informações sobre gerenciamento de dependências, consulte a página de gerenciamento de dependências do Python do Beam.

A seguir