Ao usar o Dataflow para inferência de ML, recomendamos usar a transformação
RunInference. Usar essa transformação traz vários benefícios, incluindo:
- Gerenciamento inteligente de memória de modelo otimizado para um worker do Dataflow ao realizar inferência local.
- Agrupamento dinâmico, que usa características do pipeline e restrições definidas pelo usuário para otimizar a performance.
- Recursos de back-end do Dataflow com reconhecimento de ML que podem oferecer melhor capacidade de processamento e latência.
- Mecanismos inteligentes de espera e autoescalonamento ao encontrar cotas de inferência remota.
- Métricas prontas para Production e recursos operacionais.
Ao usar RunInference, considere o seguinte:
Gerenciamento de memória
Ao carregar um modelo de ML médio ou grande, sua máquina pode ficar sem memória. O Dataflow oferece ferramentas para evitar erros de falta de memória (OOM) ao carregar modelos de ML.
Para ajustar seu pipeline de maneira eficaz, é importante entender as diferentes unidades de computação no Dataflow:
- VM: a instância de máquina virtual (tipo de máquina), como
n1-standard-8. Na documentação do Dataflow, isso às vezes é chamado de worker. - Processo: um processo de trabalho do SDK do Python. Por padrão, o Dataflow inicia um processo por vCPU na VM.
- Thread: uma única tarefa executável em um processo. Cada linha de execução
executa um
DoFn.
Memória do modelo x memória de dados
Os erros de falta de memória geralmente se enquadram em duas categorias:
- Problemas de memória relacionados ao modelo: ocorrem quando o modelo de ML é muito grande para caber várias cópias na memória.
- Problemas de memória relacionados a dados: ocorrem quando o processamento de grandes quantidades de dados (não relacionados ao tamanho do modelo) excede a memória disponível.
Use a tabela a seguir para determinar a abordagem adequada ao seu cenário.
| Cenário | Solução |
|---|---|
| Os modelos são pequenos o suficiente para caber na memória. |
Use a transformação RunInference sem outras configurações. A transformação RunInference compartilha os modelos entre
threads. Se for possível ajustar um modelo por núcleo de CPU na máquina, o pipeline poderá usar a configuração padrão. Isso carrega uma cópia do modelo por processo.
|
| Vários modelos treinados de maneira diferente estão realizando a mesma tarefa. | Use chaves por modelo. Para mais informações, consulte Executar inferência de ML com vários modelos treinados de maneira diferente. |
| Um modelo é carregado na memória, e todos os processos em uma VM compartilham esse modelo. |
Use Se você estiver criando um manipulador de modelo personalizado, em vez de usar o parâmetro |
| Você precisa configurar o número exato de modelos carregados na sua máquina. |
Para controlar exatamente quantos modelos são carregados por VM, defina o
Se você estiver criando um gerenciador de modelos personalizado, substitua o
parâmetro |
| Você está ficando sem memória devido ao processamento de dados, não ao tamanho do modelo (os dados excedem a memória). |
Tente as seguintes opções em ordem:
|
Para mais informações sobre gerenciamento de memória com o Dataflow, consulte Resolver problemas de falta de memória no Dataflow.
Lote
Há muitas maneiras de fazer o agrupamento em lote no Beam, mas ao realizar a inferência, recomendamos que você deixe a transformação RunInference lidar com o agrupamento em lote. Se o modelo tiver o melhor desempenho com um tamanho de lote específico, considere restringir os parâmetros de tamanho de lote de destino de RunInference. A maioria dos manipuladores de modelos expõe os tamanhos máximo e mínimo de lote como parâmetros. Por exemplo, para controlar o tamanho do lote
inserido em um pipeline do HuggingFace, você pode definir o seguinte manipulador de modelo:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16)
A transformação RunInference sempre respeita o tamanho máximo do lote. O tamanho mínimo do lote é uma meta, mas não há garantia de que será respeitado em todos os casos. Por exemplo, consulte Lotes baseados em pacotes na seção a seguir.
Lote com base em pacote
O Dataflow transmite dados para transformações em pacotes. Esses pacotes podem variar de tamanho dependendo das heurísticas definidas pelo Dataflow. Normalmente, os pacotes em pipelines em lote são bem grandes (O(100s) elementos), enquanto nos pipelines de streaming eles podem ser bem pequenos (incluindo tamanho 1).
Por padrão, o RunInference gera lotes de cada pacote e não
faz lotes entre pacotes. Isso significa que, se você tiver um tamanho mínimo de lote de 8, mas apenas 3 elementos restantes no pacote, RunInference usará um tamanho de lote de 3. A maioria dos manipuladores de modelos expõe um parâmetro max_batch_duration_secs que permite substituir esse comportamento. Se max_batch_duration_secs estiver definido, RunInference
será agrupado em lotes em vários pacotes. Se a transformação não conseguir atingir o tamanho do lote desejado com um único pacote, ela vai esperar no máximo max_batch_duration_secs antes de gerar um lote. Por exemplo, para ativar o processamento em lote entre pacotes ao usar um pipeline do HuggingFace, defina o seguinte manipulador de modelo:
mh = HuggingFacePipelineModelHandler('text-classification', min_batch_size=4, max_batch_size=16, max_batch_duration_secs=3)
Esse recurso é útil se você tiver tamanhos de lote muito pequenos no pipeline. Caso contrário, o custo de sincronização para processar em lote em vários pacotes geralmente não vale a pena usar, porque pode causar um embaralhamento caro.
Como corrigir falhas
O tratamento de erros é uma parte importante de qualquer pipeline de produção. O Dataflow processa elementos em pacotes arbitrários e repete o pacote completo se um erro ocorrer em qualquer elemento dele. Se você não aplicar um tratamento de erros adicional, o Dataflow vai repetir os pacotes que incluem um item com falha quatro vezes ao executar no modo em lote. O pipeline falha completamente quando um único pacote falha quatro vezes. Em execuções no modo de streaming, o Dataflow repete um pacote que inclui um item com falha indefinidamente, o que pode causar a parada permanente do pipeline.
O RunInference oferece um mecanismo de tratamento de erros integrado com a
função with_exception_handling.
Quando você aplica essa função, ela encaminha todas as falhas para um PCollection separado, junto com as mensagens de erro. Isso permite que você os reprocesse. Se você associar operações de pré-processamento ou pós-processamento ao manipulador de modelos, o RunInference também vai encaminhar essas operações para a coleta de falhas. Por exemplo, para reunir todas as falhas de um gerenciador de modelos com operações de pré-processamento e pós-processamento, use a seguinte lógica:
main, other = pcoll | RunInference(model_handler.with_preprocess_fn(f1).with_postprocess_fn(f2)).with_exception_handling()
# handles failed preprocess operations, indexed in the order in which they were applied
other.failed_preprocessing[0] | beam.Map(logging.info)
# handles failed inferences
other.failed_inferences | beam.Map(logging.info)
# handles failed postprocess operations, indexed in the order in which they were applied
other.failed_postprocessing[0] | beam.Map(logging.info)
Tempo limite
Ao usar o recurso with_exception_handling do RunInference, você também pode definir um tempo limite para cada operação, que é contado por lote. Isso evita que uma única inferência travada deixe todo o pipeline sem resposta. Se um
tempo limite ocorrer, o registro com tempo esgotado será encaminhado para a falha PCollection, todo o
estado do modelo será limpo e recriado, e a execução normal continuará.
# Timeout execution after 60 seconds
main, other = pcoll | RunInference(model_handler).with_exception_handling(timeout=60)
A partir do Beam 2.68.0, também é possível especificar um tempo limite usando a
opção de pipeline --element_processing_timeout_minutes. Nesse caso, um tempo limite faz com que um item de trabalho com falha seja repetido até ser concluído, em vez de rotear a inferência com falha para uma fila de mensagens não entregues.
Como trabalhar com aceleradores
Ao usar aceleradores, muitos gerenciadores de modelos têm configurações específicas que podem ser ativadas. Por exemplo, ao usar uma GPU e pipelines do Hugging
Face, recomendamos definir o parâmetro device como GPU:
mh = HuggingFacePipelineModelHandler('text-classification', device='GPU')
Se você estiver carregando mais de uma cópia do modelo em uma única GPU (por exemplo, usando model_copies > 1), recomendamos ativar o NVIDIA Multi-Process Service (MPS). O MPS permite que vários processos compartilhem recursos de GPU simultaneamente, o que pode melhorar significativamente a capacidade e o paralelismo. Para mais informações, consulte Melhorar o desempenho em uma GPU compartilhada usando o NVIDIA MPS.
Também recomendamos que você comece com uma única instância de VM e execute o pipeline localmente nela. Para fazer isso, siga as etapas descritas no guia de solução de problemas de GPU. Isso pode reduzir significativamente o tempo necessário para executar um pipeline. Essa abordagem também ajuda você a entender melhor a performance do seu trabalho.
Para mais informações sobre o uso de aceleradores no Dataflow, consulte a documentação do Dataflow sobre GPUs e TPUs.
Gerenciamento de dependências
Os pipelines de ML geralmente incluem dependências grandes e importantes, como PyTorch ou TensorFlow. Para gerenciar essas dependências, recomendamos usar contêineres personalizados ao implantar seu job em produção. Isso garante que o job seja executado em um ambiente estável em várias execuções e simplifica a depuração.
Para mais informações sobre gerenciamento de dependências, consulte a página de gerenciamento de dependências do Python do Beam.
A seguir
- Confira os notebooks do Dataflow ML para exemplos práticos.
- Consulte informações detalhadas sobre como usar o ML com o Apache Beam na documentação de pipelines de IA/ML.
- Saiba mais sobre a API
RunInference. - Saiba mais sobre as métricas
que podem ser usadas para monitorar a transformação do
RunInference. - Volte para a página Sobre a ML do Dataflow para conferir uma visão geral dos recursos de ML do Dataflow.