Processamento paralelo

Os pipelines são executados em clusters de máquinas. Eles alcançam alta capacidade de processamento dividindo o trabalho que precisa ser feito e executando-o em paralelo nos vários executores distribuídos pelo cluster. Em geral, quanto maior o número de divisões (também chamadas de partições), mais rápido o pipeline pode ser executado. O nível de paralelismo no pipeline é determinado pelas origens e estágios de embaralhamento no pipeline.

Fontes

No início de cada execução de pipeline, todas as fontes calculam quais dados precisam ser lidos e como eles podem ser divididos. Por exemplo, considere um pipeline básico que lê do Cloud Storage, realiza algumas transformações do Wrangler e grava de volta no Cloud Storage.

Pipeline básico mostrando a origem do Cloud Storage, a transformação do Wrangler e o destino do Cloud Storage

Quando o pipeline é iniciado, a origem do Cloud Storage examina os arquivos de entrada e os divide em partes com base nos tamanhos dos arquivos. Por exemplo, um arquivo de um gigabyte pode ser dividido em 100 partes de 10 MB cada. Cada executor lê os dados dessa divisão, executa as transformações do Wrangler e grava a saída em um arquivo part.

Dados particionados no Cloud Storage em transformações paralelas do Wrangler em arquivos de partes

Se o pipeline estiver lento, uma das primeiras coisas a verificar é se as fontes estão criando divisões suficientes para aproveitar ao máximo o paralelismo. Por exemplo, alguns tipos de compactação tornam os arquivos de texto simples indivisíveis. Se você estiver lendo arquivos compactados com gzip, talvez note que seu pipeline é executado muito mais lentamente do que se você estivesse lendo arquivos não compactados ou compactados com BZIP (que pode ser dividido). Da mesma forma, se você estiver usando a fonte de dados do banco de dados e a tiver configurado para usar apenas uma divisão, ela será executada muito mais lentamente do que se você a configurar para usar mais divisões.

Embaralhamentos

Alguns tipos de plug-ins fazem com que os dados sejam embaralhados no cluster. Isso acontece quando os registros processados por um executor precisam ser enviados para outro para realizar o cálculo. Os embaralhamentos são operações caras porque envolvem muita E/S. Os plug-ins que fazem o embaralhamento dos dados aparecem na seção Analytics do Pipeline Studio. Isso inclui plug-ins como Group By, Deduplicate, Distinct e Joiner. Por exemplo, suponha que uma etapa Agrupar por seja adicionada ao pipeline no exemplo anterior.

Suponha também que os dados lidos representem compras feitas em um supermercado. Cada registro contém um campo item e um campo num_purchased. Na etapa Agrupar por, configuramos o pipeline para agrupar registros no campo item e calcular a soma do campo num_purchased.

Quando o pipeline é executado, os arquivos de entrada são divididos conforme descrito anteriormente. Depois disso, cada registro é embaralhado no cluster para que todos os registros com o mesmo item pertençam ao mesmo executor.

Como ilustrado no exemplo anterior, os registros de compras da Apple foram originalmente distribuídos por vários executores. Para realizar a agregação, todos esses registros precisavam ser enviados pelo cluster para o mesmo executor.

A maioria dos plug-ins que exigem uma reorganização permite especificar o número de partições a serem usadas ao reorganizar os dados. Isso controla quantos executores são usados para processar os dados embaralhados.

No exemplo anterior, se o número de partições for definido como 2, cada executor vai calcular agregações para dois itens em vez de um.

É possível diminuir o paralelismo do pipeline após essa etapa. Por exemplo, considere a visualização lógica do pipeline:

Se a origem dividir os dados em 500 partições, mas o Group By embaralhar usando 200 partições, o nível máximo de paralelismo após o Group By vai cair de 500 para 200. Em vez de 500 arquivos de partes diferentes gravados no Cloud Storage, você tem apenas 200.

Escolher partições

Se o número de partições for muito baixo, você não vai usar toda a capacidade do cluster para paralelizar o máximo de trabalho possível. Definir as partições muito altas aumenta a quantidade de sobrecarga desnecessária. Em geral, é melhor usar muitas partições do que poucas. O excesso de sobrecarga é algo a se preocupar se o pipeline levar alguns minutos para ser executado e você estiver tentando reduzir alguns minutos. Se o pipeline leva horas para ser executado, geralmente não é necessário se preocupar com o excesso.

Uma maneira útil, mas excessivamente simplista, de determinar o número de partições a serem usadas é definir como max(cluster CPUs, input records / 500,000). Em outras palavras, pegue o número de registros de entrada e divida por 500.000. Se esse número for maior que o número de CPUs do cluster, use-o para o número de partições. Caso contrário, use o número de CPUs do cluster. Por exemplo, se o cluster tiver 100 CPUs e a fase de embaralhamento tiver 100 milhões de registros de entrada, use 200 partições.

Uma resposta mais completa é que os embaralhamentos têm melhor desempenho quando os dados intermediários de cada partição cabem completamente na memória de um executor, para que nada precise ser gravado em disco. O Spark reserva pouco menos de 30% da memória de um executor para armazenar dados de embaralhamento. O número exato é (memória total - 300 MB) * 30%. Se presumirmos que cada executor está configurado para usar 2 GB de memória, isso significa que cada partição não deve conter mais de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Se presumirmos que cada registro é compactado para 1 KB de tamanho, isso significa (500 MB / partição) / (1 KB / registro) = 500.000 registros por partição. Se os executores estiverem usando mais memória ou se os registros forem menores, ajuste esse número de acordo.

Desvio de dados

No exemplo anterior, as compras de vários itens foram distribuídas de maneira uniforme. Ou seja, foram três compras de maçãs, bananas, cenouras e ovos. A redistribuição em uma chave distribuída de maneira uniforme é o tipo de redistribuição com melhor desempenho, mas muitos conjuntos de dados não têm essa propriedade. Continuando o exemplo anterior de compra no supermercado, você esperaria ter muito mais compras de ovos do que de cartões de casamento. Quando há algumas chaves de embaralhamento muito mais comuns do que outras, você está lidando com dados enviesados. Dados com viés podem ter uma performance muito pior do que dados sem viés porque uma quantidade desproporcional de trabalho está sendo realizada por um pequeno número de executores. Isso faz com que um pequeno subconjunto de partições seja muito maior do que todas as outras.

Neste exemplo, há cinco vezes mais compras de ovos do que de cartões, o que significa que o agregado de ovos leva aproximadamente cinco vezes mais tempo para ser calculado. Não faz muita diferença ao lidar com apenas 10 registros em vez de dois, mas faz uma grande diferença ao lidar com cinco bilhões de registros em vez de um bilhão. Quando há distorção de dados, o número de partições usadas em um embaralhamento não tem um grande impacto no desempenho do pipeline.

Para reconhecer o desequilíbrio de dados, examine o gráfico de registros de saída ao longo do tempo. Se a etapa estiver gerando registros em um ritmo muito mais rápido no início da execução do pipeline e depois diminuir repentinamente, isso pode significar que você tem dados enviesados.

Também é possível reconhecer o desvio de dados examinando o uso da memória do cluster ao longo do tempo. Se o cluster estiver na capacidade máxima por algum tempo, mas de repente tiver baixo uso da memória por um período, isso também é um sinal de que você está lidando com o desvio de dados.

Os dados enviesados afetam mais o desempenho quando uma junção está sendo realizada. Há algumas técnicas que podem ser usadas para melhorar a performance em junções com distorção. Para mais informações, consulte Processamento paralelo para operações de JOIN.

Ajuste adaptativo para execução

Para ajustar a execução de forma adaptativa, especifique o intervalo de partições a serem usadas, não o número exato da partição. O número exato da partição, mesmo que definido na configuração do pipeline, é ignorado quando a execução adaptativa está ativada.

Se você estiver usando um cluster efêmero do serviço gerenciado para Apache Spark, o Cloud Data Fusion definirá a configuração adequada automaticamente. No entanto, para clusters estáticos do serviço gerenciado para Apache Spark ou Hadoop, os dois parâmetros de configuração a seguir podem ser definidos:

  • spark.default.parallelism: defina como o número total de vCores disponíveis no cluster. Isso garante que o cluster não esteja subutilizado e define o limite inferior para o número de partições.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: defina como 32 vezes o número de vCores disponíveis no cluster. Isso define o limite superior para o número de partições.
  • Spark.sql.adaptive.enabled: para ativar as otimizações, defina esse valor como true. O serviço gerenciado para Apache Spark define isso automaticamente, mas, se você estiver usando clusters genéricos do Hadoop, verifique se ele está ativado .

Esses parâmetros podem ser definidos na configuração do mecanismo de um pipeline específico ou nas propriedades do cluster de um cluster estático do Managed Service para Apache Spark.

A seguir