Esta página descreve como encontrar e resolver erros de falta de memória (OOM) no Dataflow.
Encontre erros de falta de memória
Para determinar se a sua pipeline está a ficar sem memória, use um dos seguintes métodos.
- Na página Detalhes da tarefa, no painel Registos, veja o separador Diagnósticos. Este separador apresenta erros relacionados com problemas de memória e a frequência com que ocorrem.
- Na interface de monitorização do Dataflow, use o gráfico Utilização de memória para monitorizar a capacidade e a utilização de memória dos trabalhadores.
- Na página Detalhes das tarefas, no painel Registos, selecione Registos do trabalhador para encontrar erros de falta de memória nos registos do trabalhador.
Os erros de falta de memória também podem aparecer nos registos do sistema. Para os ver, navegue para o Explorador de registos e use a seguinte consulta:
resource.type="dataflow_step" resource.labels.job_id="JOB_ID" "out of memory" OR "OutOfMemory" OR "Shutting down JVM"Substitua JOB_ID pelo ID da sua tarefa.
Para tarefas Java, o monitor de memória Java comunica periodicamente métricas de recolha de lixo. Se a fração do tempo da CPU usado para a recolha de lixo exceder um limite de 50% durante um período prolongado, o ambiente de teste do SDK falha. Pode ser apresentado um erro semelhante ao exemplo seguinte:
Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...Este erro pode ocorrer quando a memória física ainda está disponível e, normalmente, indica que a utilização de memória do pipeline é ineficiente. Para resolver este problema, otimize o seu pipeline.
O monitor de memória Java é configurado pela interface
MemoryMonitorOptions.
Se o seu trabalho tiver uma utilização elevada de memória ou erros de falta de memória, siga as recomendações nesta página para otimizar a utilização de memória ou aumentar a quantidade de memória disponível.
Resolva erros de falta de memória
As alterações ao pipeline do Dataflow podem resolver erros de falta de memória ou reduzir a utilização de memória. As possíveis alterações incluem as seguintes ações:
- Otimize o seu pipeline
- Reduza o número de discussões
- Use um tipo de máquina com mais memória por vCPU
O diagrama seguinte mostra o fluxo de trabalho de resolução de problemas do Dataflow descrito nesta página.
Experimente as seguintes mitigações:
- Se possível, otimize o seu pipeline para reduzir a utilização de memória.
- Se a tarefa for uma tarefa em lote, experimente os seguintes passos pela ordem indicada:
- Use um tipo de máquina com mais memória por vCPU.
- Reduza o número de threads para menos do que a contagem de vCPUs por trabalhador.
- Use um tipo de máquina personalizado com mais memória por vCPU.
- Se a tarefa for uma tarefa de streaming que usa Python, reduza o número de threads para menos de 12.
- Se o trabalho for um trabalho de streaming que usa Java ou Go, experimente o seguinte:
- Reduza o número de threads para menos de 500 para tarefas do Runner v2 ou menos de 300 para tarefas que não usam o Runner v2.
- Use um tipo de máquina com mais memória.
Otimize o seu pipeline
Várias operações de pipeline podem causar erros de falta de memória. Esta secção oferece opções para reduzir a utilização de memória da sua pipeline. Para identificar as fases do pipeline que consomem mais memória, use o Cloud Profiler para monitorizar o desempenho do pipeline.
Pode usar as seguintes práticas recomendadas para otimizar o seu pipeline:
- Use conetores de E/S integrados do Apache Beam para ler ficheiros
- Redesenhe as operações quando usar
GroupByKeyPTransforms - Reduza os dados de entrada de origens externas
- Partilhe objetos entre threads
- Use representações de elementos eficientes em termos de memória
- Reduza o tamanho das entradas laterais
- Use DoFns divisíveis do Apache Beam
Use conetores de E/S incorporados do Apache Beam para ler ficheiros
Não abra ficheiros grandes dentro de um DoFn. Para ler ficheiros, use os conetores de E/S integrados do Apache Beam.
Os ficheiros abertos num DoFn têm de caber na memória. Uma vez que são executadas várias instâncias do DoFn em simultâneo, os ficheiros grandes abertos no DoFn podem causar erros de falta de memória.
Redesenhe as operações quando usar PTransforms GroupByKey
Quando usa uma GroupByKey PTransform no Dataflow, os valores resultantes por chave e por período são processados numa única thread. Uma vez que estes dados são transmitidos como um fluxo do serviço de back-end do Dataflow para os trabalhadores, não precisam de caber na memória do trabalhador. No entanto, se os valores forem recolhidos na memória, a lógica de processamento pode causar erros de falta de memória.
Por exemplo, se tiver uma chave que contenha dados para uma janela e adicionar os valores das chaves a um objeto na memória, como uma lista, podem ocorrer erros de falta de memória. Neste cenário, o trabalhador pode não ter capacidade de memória suficiente para armazenar todos os objetos.
Para mais informações sobre as GroupByKeyPTransforms, consulte a documentação do Apache Beam
Python GroupByKey
e Java GroupByKey.
A lista seguinte contém sugestões para conceber o seu pipeline de modo a minimizar o consumo de memória quando usar GroupByKey PTransforms.
- Para reduzir a quantidade de dados por chave e por janela, evite chaves com muitos valores, também conhecidas como chaves populares.
- Para reduzir a quantidade de dados recolhidos por janela, use um tamanho de janela mais pequeno.
- Se estiver a usar valores de uma chave numa janela para calcular um número, use uma transformação
Combine. Não faça o cálculo numa única instânciaDoFndepois de recolher os valores. - Filtrar valores ou duplicados antes do processamento. Para mais informações, consulte a documentação de transformação do
Python
Filtere do JavaFilter.
Reduza os dados de entrada de origens externas
Se estiver a fazer chamadas para uma API externa ou uma base de dados para enriquecimento de dados,
os dados devolvidos têm de caber na memória do trabalhador.
Se estiver a processar chamadas em lote, recomendamos que use uma transformação GroupIntoBatches.
Se encontrar erros de falta de memória, reduza o tamanho do lote. Para mais informações
sobre o agrupamento em lotes, consulte a
documentação de transformação do Python GroupIntoBatches
e do Java GroupIntoBatches.
Partilhe objetos entre threads
A partilha de um objeto de dados na memória entre instâncias do DoFn pode melhorar o espaço e a eficiência do acesso. Os objetos de dados criados em qualquer método da DoFn, incluindo
Setup, StartBundle, Process, FinishBundle e Teardown, são invocados
para cada DoFn. No Dataflow, cada trabalhador pode ter várias DoFn
instâncias. Para uma utilização de memória mais eficiente, transmita um objeto de dados como um singleton para o partilhar em vários DoFns. Para mais informações, consulte a publicação no blogue
Reutilização da cache em vários DoFns.
Use representações de elementos eficientes em termos de memória
Avalie se pode usar representações para PCollection
elementos que usam menos memória. Quando usar codificadores no seu pipeline, considere não só as representações de elementos codificadas, mas também as descodificadas PCollection. As matrizes esparsas podem beneficiar frequentemente deste tipo de otimização.
Reduza o tamanho das entradas laterais
Se os seus DoFns usarem entradas laterais, reduza o tamanho da entrada lateral. Para entradas laterais que são coleções de elementos, considere usar vistas iteráveis, como AsIterable ou AsMultimap, em vez de vistas que materializam toda a entrada lateral ao mesmo tempo, como AsList.
Reduza o número de discussões
Pode aumentar a memória disponível por discussão reduzindo o número máximo de discussões que executam instâncias DoFn. Esta alteração reduz o paralelismo, mas
disponibiliza mais memória para cada DoFn.
A tabela seguinte mostra o número predefinido de threads que o Dataflow cria:
| Tipo de tarefa | SDK Python | SDKs Java/Go |
|---|---|---|
| Lote | 1 thread por vCPU | 1 thread por vCPU |
| Streaming com o Runner v2 | 12 discussões por vCPU | 500 threads por VM trabalhadora |
| Streaming sem Runner v2 | 12 discussões por vCPU | 300 threads por VM trabalhadora |
Para reduzir o número de threads do SDK Apache Beam, defina a seguinte opção de pipeline:
Java
Use a opção de pipeline --numberOfWorkerHarnessThreads.
Python
Use a opção de pipeline --number_of_worker_harness_threads.
Go
Use a opção de pipeline --number_of_worker_harness_threads.
Para tarefas em lote, defina o valor para um número inferior ao número de vCPUs.
Para tarefas de streaming, comece por reduzir o valor para metade do valor predefinido. Se este passo não mitigar o problema, continue a reduzir o valor em metade, observando os resultados em cada passo. Por exemplo, quando usar Python, experimente os valores 6, 3 e 1.
Use um tipo de máquina com mais memória por vCPU
Para selecionar um worker com mais memória por vCPU, use um dos seguintes métodos.
- Use um tipo de máquina com muita memória na família de máquinas de utilização geral. Os tipos de máquinas com muita memória têm mais memória por vCPU do que os tipos de máquinas padrão. A utilização de um tipo de máquina com muita memória aumenta a memória disponível para cada trabalhador e a memória disponível por thread, uma vez que o número de vCPUs permanece o mesmo. Como resultado, usar um tipo de máquina com muita memória pode ser uma forma rentável de selecionar um trabalhador com mais memória por vCPU.
- Para ter mais flexibilidade ao especificar o número de vCPUs e a quantidade de memória, pode usar um tipo de máquina personalizado. Com os tipos de máquinas personalizados, pode aumentar a memória em incrementos de 256 MB. Estes tipos de máquinas têm preços diferentes dos tipos de máquinas padrão.
- Algumas famílias de máquinas permitem-lhe usar tipos de máquinas personalizados com memória expandida. A memória expandida permite uma relação memória/vCPU mais elevada. O custo é mais elevado.
Para definir tipos de trabalhadores, use a seguinte opção de pipeline. Para mais informações, consulte Definir opções de pipeline e Opções de pipeline.
Java
Use a opção de pipeline --workerMachineType.
Python
Use a opção de pipeline --machine_type.
Go
Use a opção de pipeline --worker_machine_type.
Use apenas um processo do SDK do Apache Beam
Para pipelines de streaming Python e pipelines Python que usam o Runner v2, pode forçar o Dataflow a iniciar apenas um processo do SDK do Apache Beam por trabalhador. Antes de experimentar esta opção, tente primeiro resolver o problema através dos outros métodos. Para configurar as VMs de trabalho do Dataflow para iniciar apenas um processo Python contentorizado, use a seguinte opção de pipeline:
--experiments=no_use_multiple_sdk_containers
Com esta configuração, os pipelines Python criam um processo do SDK do Apache Beam por trabalhador. Esta configuração impede que os objetos e os dados partilhados sejam replicados várias vezes para cada processo do SDK do Apache Beam. No entanto, limita a utilização eficiente dos recursos de computação disponíveis no trabalhador.
A redução do número de processos do SDK Apache Beam para um não reduz necessariamente o número total de threads iniciados no worker. Além disso, ter todos os threads num único processo do SDK do Apache Beam pode causar um processamento lento ou fazer com que o pipeline fique bloqueado. Por conseguinte, também pode ter de reduzir o número de linhas de execução, conforme descrito na secção Reduza o número de linhas de execução desta página.
Também pode forçar os trabalhadores a usar apenas um processo do SDK do Apache Beam usando um tipo de máquina com apenas um vCPU.
Compreenda a utilização de memória do Dataflow
Para resolver problemas de erros de falta de memória, é útil compreender como os pipelines do Dataflow usam a memória.
Quando o Dataflow executa um pipeline, o processamento é distribuído por várias máquinas virtuais (VMs) do Compute Engine, frequentemente denominadas trabalhadores.
Os trabalhadores processam itens de trabalho do serviço Dataflow e delegam os itens de trabalho nos processos do SDK do Apache Beam. Um processo do SDK do Apache Beam cria instâncias de DoFns. DoFn é uma classe do SDK do Apache Beam que define uma função de processamento distribuída.
O Dataflow inicia várias threads em cada trabalhador e a memória de cada trabalhador é partilhada por todas as threads. Um segmento é uma única tarefa executável em execução num processo maior. O número predefinido de threads depende de vários fatores e varia entre tarefas em lote e de streaming.
Se o seu pipeline precisar de mais memória do que a quantidade predefinida de memória disponível nos trabalhadores, pode ocorrer um erro de falta de memória.
Os pipelines do Dataflow usam principalmente a memória do trabalhador de três formas:
Memória operacional do trabalhador
Os trabalhadores do Dataflow precisam de memória para os respetivos sistemas operativos e processos do sistema. Normalmente, a utilização de memória do trabalhador não excede 1 GB. Normalmente, a utilização é inferior a 1 GB.
- Vários processos no trabalhador usam memória para garantir que o seu pipeline está a funcionar corretamente. Cada um destes processos pode reservar uma pequena quantidade de memória para a respetiva operação.
- Quando o pipeline não usa o Streaming Engine, os processos de trabalho adicionais usam memória.
Memória do processo do SDK
Os processos do SDK Apache Beam podem criar objetos e dados que são partilhados entre threads no processo, referidos nesta página como objetos e dados partilhados do SDK. A utilização de memória destes objetos e dados partilhados do SDK é denominada memória do processo do SDK. A lista seguinte inclui exemplos de objetos e dados partilhados do SDK:
- Entradas laterais
- Modelos de aprendizagem automática
- Objetos singleton na memória
- Objetos Python criados com o módulo
apache_beam.utils.shared - Dados carregados de origens externas, como o Cloud Storage ou o BigQuery
Tarefas de streaming que não usam entradas laterais de armazenamento do Streaming Engine na memória. Para pipelines Java e Go, cada trabalhador tem uma cópia da entrada lateral. Para pipelines Python, cada processo do SDK Apache Beam tem uma cópia da entrada lateral.
As tarefas de streaming que usam o Streaming Engine têm um limite de tamanho de entrada lateral de 80 MB. As entradas laterais são armazenadas fora da memória do trabalhador.
A utilização de memória de objetos e dados partilhados do SDK aumenta linearmente com o número de processos do SDK Apache Beam. Nos pipelines Java e Go, é iniciado um processo do SDK do Apache Beam por trabalhador. Em pipelines Python, é iniciado um processo do SDK Apache Beam por vCPU. Os objetos e os dados partilhados do SDK são reutilizados em vários threads no mesmo processo do SDK Apache Beam.
Utilização de memória: DoFn
DoFn é uma classe do SDK do Apache Beam que define uma função de processamento distribuído.
Cada trabalhador pode executar DoFn instâncias em simultâneo. Cada thread executa uma DoFn
instância. Quando avaliar a utilização total de memória, calcular o tamanho do conjunto de trabalho ou a quantidade de memória necessária para uma aplicação continuar a funcionar pode ser útil. Por exemplo, se um indivíduo DoFn usar
um máximo de 5 MB de memória e um trabalhador tiver 300 threads, a DoFn utilização de memória
pode atingir um pico de 1,5 GB, ou o número de bytes de memória multiplicado pelo
número de threads. Consoante a forma como os trabalhadores estão a usar a memória, um pico na utilização de memória pode fazer com que os trabalhadores fiquem sem memória.
É difícil estimar quantas instâncias de um
DoFn
o Dataflow cria. O número depende de vários fatores, como o SDK, o tipo de máquina, entre outros. Além disso, o DoFn pode ser usado por vários threads sucessivamente.
O serviço Dataflow não garante quantas vezes um DoFn é invocado, nem garante o número exato de instâncias DoFn criadas ao longo de um pipeline.
No entanto, a tabela seguinte dá algumas informações sobre o nível de paralelismo que pode esperar e estima um limite superior para o número de instâncias DoFn.
SDK Python do Beam
| Lote | Streaming sem o motor de streaming | Streaming Engine | |
|---|---|---|---|
| Paralelismo |
1 processo por vCPU 1 discussão por processo 1 thread por vCPU
|
1 processo por vCPU 12 discussões por processo 12 discussões por vCPU |
1 processo por vCPU 12 discussões por processo 12 discussões por vCPU
|
Número máximo de instâncias DoFn em simultâneo (todos estes números estão sujeitos a alterações em qualquer altura) |
1 DoFn por discussão
1
|
1 DoFn por discussão
12
|
1 DoFn por discussão
12
|
SDK Java/Go do Beam
| Lote | Streaming Appliance e Streaming Engine sem o runner v2 | Streaming Engine com runner v2 | |
|---|---|---|---|
| Paralelismo |
1 processo por VM trabalhadora 1 thread por vCPU
|
1 processo por VM trabalhadora 300 threads por processo 300 threads por VM trabalhadora
|
1 processo por VM trabalhadora 500 discussões por processo 500 threads por VM trabalhadora
|
Número máximo de instâncias DoFn em simultâneo (todos estes números estão sujeitos a alterações em qualquer altura) |
1 DoFn por discussão
1
|
1 DoFn por discussão
300
|
1 DoFn por discussão
500
|
Por exemplo, quando usa o SDK Python com um n1-standard-2
trabalhador do Dataflow, aplica-se o seguinte:
- Tarefas em lote: o Dataflow inicia um processo por vCPU (dois neste caso). Cada processo usa uma discussão e cada discussão cria uma instância
DoFn. - Tarefas de streaming com o Streaming Engine: o Dataflow inicia um processo por vCPU (dois no total). No entanto, cada processo pode gerar até 12 threads, cada um com a sua própria instância DoFn.
Quando cria pipelines complexos, é importante compreender o
DoFn ciclo de vida.
Certifique-se de que as suas funções DoFn são serializáveis e evite modificar o argumento do elemento diretamente nas mesmas.
Quando tem um pipeline multilingue e mais do que um SDK do Apache Beam está a ser executado no worker, o worker usa o grau mais baixo de paralelismo de thread por processo possível.
Diferenças entre Java, Go e Python
O Java, o Go e o Python gerem os processos e a memória de forma diferente. Como resultado, a abordagem que deve adotar ao resolver problemas de falta de memória varia consoante a sua pipeline use Java, Go ou Python.
Pipelines Java e Go
Em pipelines Java e Go:
- Cada worker inicia um processo do SDK Apache Beam.
- Os objetos e os dados partilhados do SDK, como entradas laterais e caches, são partilhados entre todos os threads no trabalhador.
- Normalmente, a memória usada pelos objetos e dados partilhados do SDK não é dimensionada com base no número de vCPUs no trabalhador.
Pipelines Python
Em pipelines Python:
- Cada trabalhador inicia um processo do SDK Apache Beam por vCPU.
- Os objetos e os dados partilhados do SDK, como as entradas laterais e as caches, são partilhados entre todos os threads em cada processo do SDK do Apache Beam.
- O número total de threads no worker é dimensionado linearmente com base no número de vCPUs. Como resultado, a memória usada pelos objetos e dados partilhados do SDK aumenta linearmente com o número de vCPUs.
- As discussões que realizam o trabalho são distribuídas pelos processos. As novas unidades de trabalho são atribuídas a um processo sem itens de trabalho ou ao processo com o menor número de itens de trabalho atribuídos atualmente.