Em pipelines de streaming com um elevado volume de dados de entrada, existe geralmente uma compensação entre o custo e a latência. Para manter uma latência baixa, o Dataflow tem de adicionar trabalhadores à medida que o volume de tráfego aumenta. Outro fator é a rapidez com que o pipeline deve aumentar ou diminuir a escala em resposta às alterações na taxa de dados de entrada.
O escalador automático do Dataflow tem predefinições adequadas para muitas cargas de trabalho. No entanto, pode querer ajustar este comportamento para o seu cenário específico. Por exemplo, uma latência média mais elevada pode ser aceitável para reduzir os custos, ou pode querer que o Dataflow seja dimensionado mais rapidamente em resposta a picos no tráfego.
Para otimizar o escalamento automático horizontal, pode ajustar os seguintes parâmetros:
- Intervalo de dimensionamento automático: o número mínimo e máximo de trabalhadores a atribuir.
- Sugestão de utilização de trabalhadores: a utilização da CPU alvo para trabalhadores.
- Sugestão de paralelismo do trabalhador: o número alvo de paralelismo para os trabalhadores.
Defina o intervalo de dimensionamento automático
Quando cria uma nova tarefa de streaming, pode definir o número inicial de trabalhadores e o número máximo de trabalhadores. Para o fazer, especifique as seguintes opções de pipeline:
Java
--numWorkers: o número inicial de trabalhadores disponíveis quando o pipeline começa a ser executado--maxNumWorkers: o número máximo de trabalhadores disponíveis para o seu pipeline
Python
--num_workers: o número inicial de trabalhadores disponíveis quando o pipeline começa a ser executado--max_num_workers: o número máximo de trabalhadores disponíveis para o seu pipeline
Go
--num_workers: o número inicial de trabalhadores disponíveis quando o pipeline começa a ser executado--max_num_workers: o número máximo de trabalhadores disponíveis para o seu pipeline
Para tarefas de streaming que usam o Streaming Engine, o sinalizador --maxNumWorkers é opcional. A predefinição é 100. Para tarefas de streaming que não usam o Streaming Engine, o --maxNumWorkers é necessário quando a escala automática horizontal está ativada.
O valor inicial de --maxNumWorkers também determina quantos
discos persistentes são atribuídos à tarefa.
Os pipelines são implementados com um conjunto fixo de discos persistentes, igual em número a
--maxNumWorkers. Durante o streaming, os discos persistentes são redistribuídos de forma que cada trabalhador receba um número igual de discos anexados.
Se definir --maxNumWorkers, certifique-se de que o valor fornece discos suficientes para o seu pipeline. Considere o crescimento futuro ao definir o valor inicial. Para obter informações
acerca do desempenho do disco persistente, consulte o artigo
Configure o disco persistente e as VMs.
O Dataflow fatura a utilização do Persistent Disk e tem quotas do Compute Engine, incluindo quotas do Persistent Disk.
Por predefinição, o número mínimo de trabalhadores é 1 para tarefas de streaming que usam o Streaming Engine e (maxNumWorkers/15), arredondado para cima, para tarefas que não usam o Streaming Engine.
Atualize o intervalo de dimensionamento automático
Para tarefas que usam o Streaming Engine, pode ajustar o número mínimo e máximo de trabalhadores sem parar nem substituir a tarefa. Para ajustar estes valores, use uma atualização de tarefa em curso. Atualize as seguintes opções de emprego:
--min-num-workers: o número mínimo de trabalhadores.--max-num-workers: o número máximo de trabalhadores.
gcloud
Use o comando gcloud dataflow jobs update-options:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Substitua o seguinte:
- REGION: o ID da região do ponto final regional da tarefa
- MINIMUM_WORKERS: o número mínimo de instâncias do Compute Engine
- MAXIMUM_WORKERS: o número máximo de instâncias do Compute Engine
- JOB_ID: o ID da tarefa a atualizar
Também pode atualizar --min-num-workers e --max-num-workers
individualmente.
REST
Use o método
projects.locations.jobs.update:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Substitua o seguinte:
- PROJECT_ID: o Google Cloud ID do projeto da tarefa do Dataflow
- REGION: o ID da região do ponto final regional da tarefa
- JOB_ID: o ID da tarefa a atualizar
- MINIMUM_WORKERS: o número mínimo de instâncias do Compute Engine
- MAXIMUM_WORKERS: o número máximo de instâncias do Compute Engine
Também pode atualizar min_num_workers e max_num_workers individualmente.
Especifique os parâmetros a atualizar no parâmetro de consulta updateMask e inclua os valores atualizados no campo runtimeUpdatableParams do corpo do pedido. O exemplo seguinte atualiza min_num_workers:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Para tarefas que não usam o Streaming Engine, pode substituir a tarefa existente por um valor atualizado de maxNumWorkers.
Se atualizar uma tarefa de streaming que não esteja a usar o Streaming Engine, a tarefa atualizada tem o ajuste de escala automático horizontal desativado por predefinição. Para manter o ajuste de escala automático ativado,
especifique --autoscalingAlgorithm e --maxNumWorkers para a tarefa atualizada.
Defina a sugestão de utilização de trabalhadores
O Dataflow usa a utilização média da CPU como um sinal para quando aplicar a escala automática horizontal. Por predefinição, o Dataflow define uma utilização de CPU alvo de 0,8. Quando a utilização fica fora deste intervalo, o Dataflow pode adicionar ou remover trabalhadores.
Para um maior controlo sobre o comportamento do dimensionamento automático, pode definir a utilização da CPU alvo para um valor no intervalo [0,1, 0,9].
Defina um valor de utilização da CPU mais baixo se quiser alcançar latências de pico mais baixas. Um valor mais baixo permite que o Dataflow seja expandido de forma mais agressiva em resposta ao aumento da utilização dos trabalhadores e reduzido de forma mais conservadora para melhorar a estabilidade. Um valor mais baixo também oferece mais espaço livre quando o pipeline está em execução num estado estável, o que geralmente resulta numa latência de cauda mais baixa. (A latência final mede os tempos de espera mais longos antes de um novo registo ser processado.)
Defina um valor mais elevado se quiser poupar recursos e manter os custos mais baixos quando o tráfego aumentar. Um valor mais elevado impede o aumento excessivo da escala, à custa de uma latência mais elevada.
Para configurar a sugestão de utilização quando executa uma tarefa sem modelo, defina a worker_utilization_hint
opção de serviço. Para uma tarefa de modelo,
atualize a sugestão de utilização. As opções de serviço não são
suportadas.
O exemplo seguinte mostra como usar worker_utilization_hint:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].
Go
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
Substitua TARGET_UTILIZATION por um valor no intervalo [0,1, 0,9].
Para novos pipelines, recomendamos que faça testes com cargas realistas usando a definição predefinida. Em seguida, avalie o comportamento do dimensionamento automático à medida que se aplica ao seu pipeline e faça os ajustes necessários.
A sugestão de utilização é apenas um fator que o Dataflow usa quando decide se deve dimensionar os trabalhadores. Outros fatores, como a acumulação e as chaves disponíveis, podem substituir o valor da sugestão. Além disso, a sugestão não é um alvo rigoroso. O escalador automático tenta manter a utilização da CPU dentro do intervalo do valor da sugestão, mas a métrica de utilização agregada pode ser superior ou inferior. Para mais informações, consulte o artigo Heurísticas de escalabilidade automática de streaming.
Atualize a sugestão de utilização
Para atualizar a sugestão de utilização enquanto uma tarefa está em execução, faça uma atualização em tempo real da seguinte forma:
gcloud
Use o comando
gcloud dataflow jobs update-options:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
Substitua o seguinte:
- REGION: o ID da região do ponto final regional da tarefa
- JOB_ID: o ID da tarefa a atualizar
- TARGET_UTILIZATION: um valor no intervalo [0,1, 0,9]
Para repor a sugestão de utilização para o valor predefinido, use o seguinte comando gcloud:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
Use o método
projects.locations.jobs.update:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
Substitua o seguinte:
- PROJECT_ID: o Google Cloud ID do projeto da tarefa do Dataflow.
- REGION: o ID da região do ponto final regional da tarefa.
- JOB_ID: o ID da tarefa a atualizar.
- TARGET_UTILIZATION: um valor no intervalo [0,1, 0,9]
Definir sugestão de paralelismo do trabalhador
Para processar o escalamento automático com operações longas que dependem menos das CPUs, como cargas de trabalho intensivas de ML, pode definir a sugestão de paralelismo do trabalhador através de sugestões de recursos do Apache Beam. Estas sugestões mudam o dimensionamento automático para um modo diferente otimizado para cargas de trabalho com utilização intensiva da GPU ou transformações com um longo tempo de processamento.
O exemplo seguinte mostra como anexar uma sugestão de paralelismo a uma transformação:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
Substitua TARGET_PARALLELISM_PER_WORKER por um valor adequado ao seu exemplo de utilização. Para orientações gerais, consulte como escolher um bom valor inicial.
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker="TARGET_PARALLELISM_PER_WORKER")
Substitua TARGET_PARALLELISM_PER_WORKER por um valor adequado ao seu exemplo de utilização. Para orientações gerais, consulte como escolher um bom valor inicial.
Escolha o valor da sugestão de paralelismo do trabalhador
Para exemplos de utilização de ML, um bom valor inicial é equivalente ao número de modelos em execução em paralelo em cada trabalhador. Este valor é limitado pela capacidade dos aceleradores no trabalhador e pelo tamanho do modelo.
Para outros exemplos de utilização, o pipeline está limitado pela memória ou pela CPU. Para pipelines com restrições de memória, use o limite de memória para calcular o processamento paralelo máximo. Para pipelines com restrições de CPU, recomenda-se que mantenha a política de escalamento automático predefinida, em vez de fornecer uma sugestão de paralelismo.
É possível ajustar o valor para satisfazer as necessidades de processamento de outras fases, como a escrita num destino. Aumentar o valor em 1 ou 2 quando o paralelismo do modelo é 2 ajuda a reconhecer o tempo de processamento mais rápido da escrita no destino, dando-lhe mais folga para ter em conta o processamento feito noutras fases. Se o seu pipeline não envolver a mistura e as transformações forem fundidas num único estágio, não precisa de ajustar o valor para outras transformações.
Este valor também pode ser ajustado para simular os efeitos dos atrasos aceitáveis na acumulação de trabalho. Por exemplo, se não se importar com um atraso máximo de 10 minutos e o tempo de processamento médio do seu modelo for de 1 minuto, pode optar por aumentar o valor em 1, partindo do princípio de que o número máximo de trabalhadores está definido como 10.
Heurísticas de escala automática com utilização intensiva da GPU
Na definição com utilização intensiva da GPU indicada através da sugestão de paralelismo de definição, o Dataflow tem vários fatores em consideração quando dimensiona automaticamente. Estes fatores incluem:
- Chaves disponíveis. As chaves são a unidade fundamental de paralelismo no Dataflow.
- Número máximo de pacotes ativos por trabalhador. Isto sugere o número ideal máximo de paralelismo de processamento no worker.
A ideia geral por detrás das decisões de escalabilidade é calcular os trabalhadores necessários para processar a carga atual, conforme indicado pelas chaves disponíveis. Por exemplo, se houver 100 chaves disponíveis para processamento e o paralelismo máximo por trabalhador for 10, deve ter um total de 10 trabalhadores.
Se o seu pipeline for complexo e tiver uma carga de trabalho pesada com utilização intensiva da GPU e várias transformações com utilização intensiva da CPU, recomendamos que ative o ajuste adequado. Isto permite que o serviço faça uma boa distinção entre o trabalho que requer muita CPU e o trabalho que requer muita GPU e, em seguida, dimensione cada conjunto de trabalhadores em conformidade.
Heurísticas de escala automática de streaming
Para pipelines de streaming, o objetivo da escala automática horizontal é minimizar o backlog, ao mesmo tempo que maximiza a utilização e o débito dos trabalhadores, e reagir rapidamente a picos de carga.
O Dataflow tem vários fatores em consideração quando dimensiona automaticamente, incluindo:
Backlog. O tempo de pendências estimado é calculado a partir do débito e dos bytes de pendências ainda a processar a partir da origem de entrada. Uma conduta é considerada em atraso quando o tempo de atraso estimado permanece acima de 15 segundos.
Utilização da CPU alvo. O alvo predefinido para a utilização média da CPU é 0,8. Pode substituir este valor.
Chaves disponíveis. As chaves são a unidade fundamental de paralelismo no Dataflow.
Em alguns casos, o Dataflow usa os seguintes fatores nas decisões de escalabilidade automática. Se estes fatores forem usados para o seu trabalho, pode ver essas informações no separador Ajuste automático de escala.
A limitação baseada em chaves usa o número de chaves de processamento recebidas pela tarefa para calcular o limite para os trabalhadores do utilizador, porque cada chave só pode ser processada por um trabalhador de cada vez.
Redução da atenuação. Se o Dataflow detetar que ocorreram decisões de escalamento automático instáveis, diminui a taxa de redução de escala para melhorar a estabilidade.
A atualização com base na CPU usa uma utilização elevada da CPU como critério de atualização.
Para tarefas de streaming que não usam o Streaming Engine, a escalabilidade pode ser limitada pelo número de discos persistentes. Para mais informações, consulte o artigo Defina o intervalo de dimensionamento automático.
Redimensionamento automático com utilização intensiva da GPU, se ativado através da definição da sugestão de paralelismo do trabalhador. Para mais informações, consulte o artigo Heurísticas de escalamento automático intensivas da GPU
Aumentar a escala. Se um pipeline de streaming permanecer em atraso com paralelismo suficiente nos trabalhadores durante vários minutos, o Dataflow é dimensionado para cima. O fluxo de dados tenta limpar o atraso no prazo de aproximadamente 150 segundos após o aumento da escala, tendo em conta o débito atual por trabalhador. Se houver um atraso, mas o trabalhador não tiver paralelismo suficiente para trabalhadores adicionais, o pipeline não é dimensionado. (Aumentar o número de trabalhadores além do número de chaves disponíveis para processamento paralelo não ajuda a processar o atraso mais rapidamente.)
Redução da escala: quando o escalador automático toma uma decisão de redução da escala, a fila de tarefas pendentes é o fator de maior prioridade. O ajuste de escala automático visa um atraso de, no máximo, 15 segundos. Se a fila de tarefas pendentes descer abaixo de 10 segundos e a utilização média dos trabalhadores for inferior ao objetivo de utilização da CPU, o Dataflow é reduzido. Desde que o atraso seja aceitável, o dimensionamento automático tenta manter a utilização da CPU próxima da utilização da CPU alvo. No entanto, se a utilização já estiver suficientemente próxima do objetivo, o escalador automático pode manter o número de trabalhadores inalterado, porque cada passo de redução tem um custo.
O Streaming Engine também usa uma técnica de escalabilidade automática preditiva baseada no atraso do temporizador. Os dados ilimitados num pipeline de streaming são divididos em janelas agrupadas por datas/horas. No final de um período, os temporizadores são acionados para cada chave que está a ser processada nesse período. O acionamento de um temporizador indica que o período expirou para uma determinada chave. O Streaming Engine pode medir o atraso do temporizador e prever quantos temporizadores serão acionados no final de uma janela. Ao usar o backlog do temporizador como um sinal, o Dataflow pode estimar a quantidade de processamento que tem de ocorrer quando os temporizadores futuros são acionados. Com base na carga futura estimada, o Dataflow é dimensionado automaticamente antecipadamente para satisfazer a procura esperada.
Métrica
Para encontrar os limites de dimensionamento automático atuais de uma tarefa, consulte as seguintes métricas:
job/max_worker_instances_limit: número máximo de trabalhadores.job/min_worker_instances_limit: número mínimo de trabalhadores.
Para obter informações sobre a utilização dos trabalhadores, consulte as seguintes métricas:
job/aggregated_worker_utilization: a utilização agregada dos trabalhadores.job/worker_utilization_hint: a sugestão de utilização atual do trabalhador.
Para aceder a estatísticas sobre o comportamento do escalador automático, consulte a seguinte métrica:
job.worker_utilization_hint_is_actively_used: indica se o escalador automático está a usar ativamente a sugestão de utilização do trabalhador. Se outros fatores substituírem a sugestão quando esta métrica é amostrada, o valor éfalse.job/horizontal_worker_scaling: descreve as decisões tomadas pelo ajuste automático de escala. Esta métrica contém as seguintes etiquetas:direction: especifica se o redimensionador automático aumentou ou diminuiu a escala, ou não tomou nenhuma medida.rationale: especifica a fundamentação da decisão do escalador automático.
Para mais informações, consulte o artigo Métricas do Cloud Monitoring. Estas métricas também são apresentadas nos gráficos de monitorização do ajuste automático.
O que se segue?
- Monitorize o ajuste de escala automático do Dataflow
- Resolva problemas de dimensionamento automático do Dataflow