Ao usar o Dataflow para inferência de ML, recomendamos usar a transformação
RunInference. Usar essa transformação traz vários benefícios, incluindo:
- Gerenciamento inteligente de memória do modelo otimizado para um worker do Dataflow ao realizar inferência local.
- Agrupamento dinâmico que usa características do pipeline e restrições definidas pelo usuário para otimizar a performance.
- Recursos de back-end do Dataflow com reconhecimento de ML que podem oferecer melhor capacidade de processamento e latência.
- Mecanismos inteligentes de espera e autoescalonamento ao encontrar cotas de inferência remota.
- Métricas prontas para Production e recursos operacionais.
Ao usar RunInference, considere o seguinte:
Gerenciamento de memória
Ao carregar um modelo de ML médio ou grande, sua máquina pode ficar sem memória. O Dataflow oferece ferramentas para ajudar a evitar erros de falta de memória (OOM) ao carregar modelos de ML. Use a tabela a seguir para determinar a abordagem adequada ao seu cenário.
| Cenário | Solução |
|---|---|
| Os modelos são pequenos o suficiente para caber na memória. |
Use a transformação RunInference sem outras configurações. A transformação RunInference compartilha os modelos entre
threads. Se for possível ajustar um modelo por núcleo de CPU na máquina, o pipeline poderá usar a configuração padrão.
|
| Vários modelos treinados de maneira diferente estão realizando a mesma tarefa. | Use chaves por modelo. Para mais informações, consulte Executar inferência de ML com vários modelos treinados de maneira diferente. |
| Um modelo é carregado na memória, e todos os processos compartilham esse modelo. |
Use o parâmetro Se você estiver criando um gerenciador de modelos personalizado, em vez de usar o parâmetro
|
| Você precisa configurar o número exato de modelos carregados na sua máquina. |
Para controlar exatamente quantos modelos são carregados, use o parâmetro
Se você estiver criando um gerenciador de modelos personalizado, substitua o parâmetro
|
Para mais informações sobre gerenciamento de memória com o Dataflow, consulte Resolver problemas de falta de memória no Dataflow.
Lote
Há muitas maneiras de fazer o processamento em lote no Beam, mas ao realizar inferências, recomendamos que você deixe a transformação RunInference cuidar disso. Se o modelo tiver o melhor desempenho com um tamanho de lote específico, considere restringir os parâmetros de tamanho de lote de destino de RunInference. A maioria dos manipuladores de modelos expõe os tamanhos máximo e mínimo de lote como parâmetros. Por exemplo, para controlar o tamanho do lote
inserido em um pipeline do HuggingFace, você pode definir o seguinte manipulador de modelo:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)
A transformação RunInference sempre respeita o tamanho máximo do lote. O tamanho mínimo do lote é uma meta, mas não há garantia de que será respeitado em todos os casos. Por exemplo, consulte Lotes baseados em pacotes na seção a seguir.
Lote com base em pacote
O Dataflow transmite dados para transformações em pacotes. Esses pacotes podem variar de tamanho dependendo das heurísticas definidas pelo Dataflow. Normalmente, os pacotes em pipelines em lote são bem grandes (O(100s) elementos), enquanto nos pipelines de streaming eles podem ser bem pequenos (incluindo tamanho 1).
Por padrão, o RunInference gera lotes de cada pacote e não
cria lotes entre pacotes. Isso significa que, se você tiver um tamanho mínimo de lote de 8, mas apenas 3 elementos restantes no pacote, RunInference usará um tamanho de lote de 3. A maioria dos manipuladores de modelos expõe um parâmetro max_batch_duration_secs que permite substituir esse comportamento. Se max_batch_duration_secs estiver definido, RunInference
será agrupado em lotes em vários pacotes. Se a transformação não conseguir atingir o tamanho do lote desejado com um único pacote, ela vai esperar no máximo max_batch_duration_secs antes de gerar um lote. Por exemplo, para ativar o processamento em lote entre pacotes ao usar um pipeline do HuggingFace, defina o seguinte manipulador de modelo:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)
Esse recurso é útil se você tiver tamanhos de lote muito pequenos no seu pipeline. Caso contrário, o custo de sincronização para processar em lote em vários pacotes geralmente não vale a pena usar, porque pode causar um embaralhamento caro.
Como corrigir falhas
O tratamento de erros é uma parte importante de qualquer pipeline de produção. O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo se um erro ocorrer em qualquer elemento dele. Se você não aplicar um tratamento de erros adicional, o Dataflow vai repetir os pacotes que incluem um item com falha quatro vezes ao executar no modo em lote. O pipeline falha completamente quando um único pacote falha quatro vezes. Em execuções no modo de streaming, o Dataflow repete um pacote que inclui um item com falha indefinidamente, o que pode causar a parada permanente do pipeline.
O RunInference oferece um mecanismo de tratamento de erros integrado com a
função
with_exception_handling.
Quando você aplica essa função, ela encaminha todas as falhas para um PCollection separado, junto com as mensagens de erro. Isso permite que você os reprocesse. Se você associar operações de pré-processamento ou pós-processamento ao manipulador de modelo, o RunInference também vai encaminhar essas operações para a coleta de falhas. Por exemplo, para reunir todas as falhas de um gerenciador de modelos com operações de pré-processamento e pós-processamento, use a seguinte lógica:
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)
# handles failed inferences
other.failed_inferences | beam.Map(logging.info)
# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)
Tempo limite
Ao usar o recurso with_exception_handling do RunInference, você também pode definir um tempo limite para cada operação, que é contado por lote. Isso evita que uma única inferência travada deixe todo o pipeline sem resposta. Se um
tempo limite ocorrer, o registro com tempo esgotado será encaminhado para a falha PCollection, todo o
estado do modelo será limpo e recriado, e a execução normal continuará.
# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)
A partir do Beam 2.68.0, também é possível especificar um tempo limite usando a
opção de pipeline --element_processing_timeout_minutes. Nesse caso, um tempo limite faz com que um item de trabalho com falha seja repetido até ser concluído, em vez de rotear a inferência com falha para uma fila de mensagens não entregues.
Como trabalhar com aceleradores
Ao usar aceleradores, muitos gerenciadores de modelos têm configurações específicas que podem ser ativadas. Por exemplo, ao usar uma GPU e pipelines do Hugging
Face, recomendamos definir o parâmetro device como GPU:
mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')
Também recomendamos que você comece com uma única instância de VM e execute o pipeline localmente nela. Para fazer isso, siga as etapas descritas no guia de solução de problemas de GPU. Isso pode reduzir significativamente o tempo necessário para executar um pipeline. Essa abordagem também pode ajudar você a entender melhor a performance do seu trabalho.
Para mais informações sobre o uso de aceleradores no Dataflow, consulte a documentação do Dataflow sobre GPUs e TPUs.
Gerenciamento de dependências
Os pipelines de ML geralmente incluem dependências grandes e importantes, como PyTorch ou TensorFlow. Para gerenciar essas dependências, recomendamos usar contêineres personalizados ao implantar seu job em produção. Isso garante que o job seja executado em um ambiente estável em várias execuções e simplifica a depuração.
Para mais informações sobre gerenciamento de dependências, consulte a página de gerenciamento de dependências do Python do Beam.
A seguir
- Confira os notebooks do Dataflow ML para exemplos práticos.
- Consulte informações detalhadas sobre como usar o ML com o Apache Beam na documentação de pipelines de IA/ML.
- Saiba mais sobre a API
RunInference. - Saiba mais sobre as métricas
que podem ser usadas para monitorar a transformação do
RunInference. - Volte para a página Sobre a ML do Dataflow para conferir uma visão geral dos recursos de ML do Dataflow.