O dimensionamento dinâmico de threads faz parte do conjunto de funcionalidades de dimensionamento vertical do Dataflow. Complementa a funcionalidade de dimensionamento automático horizontal do Dataflow ajustando o número de tarefas paralelas, também conhecidas como pacotes, que cada trabalhador do Dataflow executa. O objetivo é aumentar a eficiência geral do pipeline do Dataflow.
Quando o Dataflow executa um pipeline, o processamento é distribuído por várias máquinas virtuais (VMs) do Compute Engine, também conhecidas como trabalhadores. Um segmento é uma única tarefa executável em execução num processo maior. O Dataflow inicia várias threads em cada trabalhador.
Com o ajuste dinâmico de threads ativado, o serviço Dataflow escolhe automaticamente o número adequado de threads a executar em cada worker do Dataflow. Uma vez que cada thread executa uma tarefa, o aumento do número de threads permite que mais tarefas sejam executadas em paralelo num processo. Quando usa esta funcionalidade com a funcionalidade de escalamento automático horizontal, o número total de threads usadas pelo pipeline permanece o mesmo, mas são usados menos trabalhadores.
O dimensionamento dinâmico de threads usa um algoritmo para determinar quantos threads cada worker precisa com base em sinais de utilização de recursos gerados durante a execução do pipeline. Para mais informações, consulte a secção Como funciona nesta página.
Vantagens
O dynamic thread scaling tem as seguintes potenciais vantagens.
- Permite que os trabalhadores do Dataflow processem dados de forma mais eficiente, melhorando a utilização da CPU e da memória por trabalhador.
- Melhora o processamento paralelo ajustando o número de threads de trabalho disponíveis para executar tarefas em paralelo durante a execução do pipeline.
- Reduz o número de trabalhadores necessários para processar grandes conjuntos de dados, o que pode reduzir os seus custos.
Apoio técnico e limitações
- O dynamic thread scaling está disponível para pipelines que usam os SDKs Java, Python e Go.
- A tarefa do Dataflow tem de usar o Runner v2.
- Apenas são suportados pipelines em lote.
- Os pipelines que exigem muita CPU ou memória podem não beneficiar do dimensionamento dinâmico de threads.
- O dynamic thread scaling não reduz o tempo necessário para a conclusão de uma tarefa do Dataflow.
Como funciona
O dimensionamento dinâmico de threads usa princípios de ajuste automático para dimensionar dinamicamente o número de threads para cima ou para baixo em cada worker no conjunto de workers do Dataflow. A contagem de threads é dimensionada de forma independente em cada trabalhador. Cada thread executa uma tarefa. Aumentar o número de threads permite que mais tarefas sejam executadas em paralelo num worker. À medida que as tarefas são concluídas e os processos já não são necessários, o número de processos diminui. Um algoritmo determina quantos threads cada trabalhador precisa.
O número de threads num worker é aumentado até um máximo de dois threads por vCPU quando são cumpridas as seguintes condições:
- A utilização de memória no trabalhador é inferior a 50%.
- A utilização da CPU no trabalhador é inferior a 65%.
O número de discussões num trabalhador é reduzido para um mínimo de uma discussão por vCPU quando a seguinte condição é cumprida:
- A utilização de memória no trabalhador é superior a 70%.
Para ver a utilização de memória e CPU da sua tarefa, use o separador Métricas da tarefa da interface Web do Dataflow.
Para garantir que as recomendações são válidas, o Dataflow aguarda que a utilização de recursos se estabilize antes de enviar recomendações aos trabalhadores. Por exemplo, a utilização de memória e CPU pode estar dentro do intervalo para o dimensionamento, mas, como a utilização de recursos ainda está a aumentar, o Dataflow não envia uma recomendação. Depois de a utilização de recursos estabilizar, o Dataflow envia uma recomendação.
Se ocorrer um erro de falta de memória (OOM), o dimensionamento de threads é desativado automaticamente e o pipeline é executado com um thread por vCPU.
Ative o dimensionamento dinâmico de threads
Para ativar a dynamic thread scaling, use a opção de serviço do Dataflow seguinte.
Java
--dataflowServiceOptions=enable_dynamic_thread_scaling
Python
--dataflow_service_options=enable_dynamic_thread_scaling
Go
--dataflow_service_options=enable_dynamic_thread_scaling
Quando o dynamic thread scaling está ativado, também pode definir o número inicial e máximo de trabalhadores disponíveis para o seu pipeline durante a execução. Para mais informações, consulte Opções de pipeline.
Verifique se o dynamic thread scaling está ativado
Quando o dynamic thread scaling está ativado, a seguinte mensagem é apresentada nos ficheiros de registo do trabalhador:
Enabling thread vertical scaling feature in worker.
Para ver os ficheiros de registo do trabalhador, no Explorador de registos, use o painel Consulta para filtrar os registos por Nome do registo. Use o seguinte nome de registo no filtro:
projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness
Pode ver o número recomendado de discussões nos ficheiros de registo do trabalhador. A mensagem seguinte inclui o número recomendado de discussões:
worker_thread_scaling_report_response { recommended_thread_count: NUMBER }
Se a utilização de recursos não estiver no intervalo de escalamento, o valor apresentado é igual ao número de CPUs virtuais no trabalhador.
Também pode usar a Google Cloud consola para ver se o dimensionamento dinâmico de threads está ativado. Quando está ativada, no painel Informações da tarefa do Dataflow, na linha dataflowServiceOptions da secção Opções do pipeline, é apresentado enable_dynamic_thread_scaling.
Resolução de problemas
Esta secção fornece instruções para resolver problemas comuns relacionados com o ajuste dinâmico de threads.
O desempenho degrada-se com o dynamic thread scaling ativado
O aumento do número de threads pode causar problemas de desempenho nos seguintes casos:
- Quando vários processos tentam usar o mesmo recurso, um processo consegue usar o recurso enquanto os outros têm de esperar. Esta situação é conhecida como concorrência de recursos. Quando ocorre contenção de recursos, o desempenho do pipeline pode diminuir.
- Quando ocorrem erros de falta de memória, o dynamic thread scaling é desativado. Em alguns casos, os erros de falta de memória podem fazer com que o pipeline falhe.
Verifique se o número de fios aumentou. Para obter informações sobre como validar a quantidade de threads recomendada, consulte Valide se o dimensionamento de threads está ativado nesta página.
Se o dimensionamento de threads estiver ativado, para resolver este problema, quando executar o pipeline, não inclua a opção de serviço de dimensionamento de threads dinâmico.
Trabalhador unificado … ativado e desativado
Depois de ativar o dimensionamento dinâmico de threads, a tarefa pode falhar com o seguinte erro:
The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.
Este erro ocorre quando o Runner v2 está explicitamente desativado.
Para resolver este problema, ative o Runner v2. Para mais informações, consulte a secção Ative o Dataflow Runner v2 na página "Use o Dataflow Runner V2".
Atualize o seu SDK
Depois de ativar o dimensionamento dinâmico de threads, a tarefa pode falhar com o seguinte erro:
Java
Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.
Python
Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.
Este erro ocorre quando não é possível ativar o Runner v2 porque a versão do SDK não o suporta.
Para resolver este problema, use uma versão do SDK que suporte o Runner v2.
Não é possível ativar a funcionalidade de dimensionamento vertical de discussões
Depois de ativar o dimensionamento dinâmico de threads, a tarefa pode falhar com o seguinte erro:
The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.
Este erro ocorre quando o pipeline define explicitamente o número de threads por trabalhador através da numberOfWorkerHarnessThreads ou da number_of_worker_harness_threads
opção de pipeline.
Para resolver este problema, remova a opção de pipeline numberOfWorkerHarnessThreads ou number_of_worker_harness_threads do seu pipeline.