O modo de flexibilidade melhorada (EFM) do Dataproc gere os dados de aleatorização para minimizar os atrasos no progresso das tarefas causados pela remoção de nós de um cluster em execução. O EFM transfere dados de mistura escrevendo dados para os trabalhadores primários. Os trabalhadores extraem dados desses nós remotos durante a fase de redução. Este modo só está disponível para tarefas do Spark.
Uma vez que o EFM não armazena dados de aleatorização intermédios em trabalhadores secundários, o EFM é adequado para clusters que usam VMs preemptíveis ou apenas dimensionam automaticamente o grupo de trabalhadores secundários.
O EFM é suportado no Dataproc
2.0.31+
, 2.1.6+
, 2.2+
e versões de
imagens posteriores.
- Os trabalhos do Apache Hadoop YARN que não suportam a mudança de localização do AppMaster podem falhar no modo de flexibilidade melhorada (consulte Quando esperar que os AppMasters terminem).
- O modo de flexibilidade melhorada não é recomendado:
- Num cluster que tenha apenas trabalhadores principais
- em tarefas de streaming, uma vez que pode demorar até 30 minutos após a conclusão da tarefa para limpar os dados de mistura intermédios.
- num cluster que executa notebooks, uma vez que os dados de aleatorização podem não ser limpos durante a duração da sessão.
- quando as tarefas do Spark são executadas num cluster com a desativação suave ativada. A desativação gradual e o EFM podem ter objetivos diferentes, uma vez que o mecanismo de desativação gradual do YARN mantém os nós DECOMMISSIONING até que todas as aplicações envolvidas sejam concluídas.
- num cluster que executa tarefas do Spark e não do Spark.
- O modo de flexibilidade melhorada não é suportado:
- Quando o ajuste de escala automático do trabalhador principal está ativado. Na maioria dos casos, os trabalhadores principais continuam a armazenar dados de aleatorização que não são migrados automaticamente. A redução do grupo de trabalho principal anula as vantagens da EFM.
Use o modo de flexibilidade melhorada
A flexibilidade melhorada é ativada quando cria um cluster definindo a
dataproc:efm.spark.shuffle
propriedade do cluster
como primary-worker
.
Exemplo:
gcloud dataproc clusters create cluster-name \ --region=region \ --properties=dataproc:efm.spark.shuffle=primary-worker \ other flags ...
Exemplo do Apache Spark
- Execute uma tarefa WordCount no texto público de Shakespeare usando o jar de exemplos do Spark no cluster EFM.
gcloud dataproc jobs submit spark \ --cluster=cluster-name \ --region=region \ --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \ --class=org.apache.spark.examples.JavaWordCount \ -- gs://apache-beam-samples/shakespeare/macbeth.txt
Configure SSDs locais
Uma vez que o EFM escreve dados de mistura intermédios em discos ligados a VMs, beneficia do débito adicional e dos IOPS fornecidos pelos SSDs locais. Para facilitar a atribuição de recursos, segmente um objetivo de aproximadamente 1 partição de SSD local por 4 vCPUs ao configurar máquinas de trabalho primárias.
Para anexar SSDs locais, transmita a flag --num-worker-local-ssds
ao comando gcloud Dataproc clusters create.
Geralmente, não precisa de SSDs locais em trabalhadores secundários.
A adição de SSDs locais aos trabalhadores secundários de um cluster (através da flag --num-secondary-worker-local-ssds
) é frequentemente menos importante porque os trabalhadores secundários não escrevem dados de mistura localmente.
No entanto, uma vez que os SSDs locais melhoram o desempenho do disco local, pode optar por adicionar SSDs locais a trabalhadores secundários se esperar que os trabalhos sejam limitados por E/S devido à utilização do disco local: o seu trabalho usa um disco local significativo para espaço temporário ou as suas partições são demasiado grandes para caber na memória e vão transbordar para o disco.
Rácio de trabalhadores secundários
Uma vez que os trabalhadores secundários escrevem os respetivos dados de mistura nos trabalhadores principais, o seu cluster tem de conter um número suficiente de trabalhadores principais com recursos de CPU, memória e disco suficientes para acomodar a carga de mistura da sua tarefa. Para clusters de escala automática, para impedir que o grupo principal seja dimensionado e cause um comportamento indesejável, defina minInstances
para o valor maxInstances
na política de escala automática para o grupo de trabalho principal.
Se tiver uma elevada proporção de trabalhadores secundários para principais (por exemplo, 10:1), monitorize a utilização da CPU, a rede e a utilização do disco dos trabalhadores principais para determinar se estão sobrecarregados. Para o fazer:
Aceda à página Instâncias de VM na Google Cloud consola.
Clique na caixa de verificação no lado esquerdo do trabalhador principal.
Clique no separador MONITORIZAÇÃO para ver a utilização da CPU do trabalhador principal, os IOPS do disco, os bytes de rede e outras métricas.
Se os trabalhadores principais estiverem sobrecarregados, considere aumentar o número de trabalhadores principais manualmente.
Redimensione o grupo de trabalhadores principal
O grupo de trabalhadores principal pode ser aumentado em segurança, mas a redução do grupo de trabalhadores principal pode afetar negativamente o progresso das tarefas. As operações que reduzem o número de trabalhadores do grupo de trabalhadores principal devem usar a desativação suave, que é ativada ao definir o sinalizador --graceful-decommission-timeout
.
Clusters com escala automática: a escalabilidade do grupo de trabalho principal está desativada em clusters do EFM com políticas de escala automática. Para redimensionar o grupo de trabalho principal num cluster com redimensionamento automático:
Desative o dimensionamento automático.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --disable-autoscaling
Ajuste a escala do grupo principal.
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --num-workers=num-primary-workers \ --graceful-decommission-timeout=graceful-decommission-timeout # (if downscaling)
Reative o redimensionamento automático:
gcloud dataproc clusters update \ --cluster=cluster-name \ --region=region \ --autoscaling-policy=autoscaling-policy
Monitorize a utilização do disco do trabalhador principal
Os trabalhadores principais têm de ter espaço em disco suficiente para os dados de mistura aleatória do cluster.
Pode monitorizar esta situação indiretamente através da métrica remaining HDFS capacity
.
À medida que o disco local fica cheio, o espaço fica indisponível para o HDFS e a capacidade restante diminui.
Por predefinição, quando a utilização do disco local de um trabalhador principal excede 90% da capacidade, o nó é marcado como UNHEALTHY na IU do nó do YARN. Se tiver problemas de capacidade do disco, pode eliminar dados não usados do HDFS ou aumentar o conjunto de trabalhadores principal.
Configuração avançada
Partição e paralelismo
Quando enviar uma tarefa do Spark, configure um nível adequado de particionamento. Decidir o número de partições de entrada e saída para uma fase de mistura envolve uma troca entre diferentes caraterísticas de desempenho. É melhor fazer experiências com valores que funcionem para os seus formatos de tarefas.
Partições de entrada
A partição de entrada do Spark e do MapReduce é determinada pelo conjunto de dados de entrada. Quando lê ficheiros do Cloud Storage, cada tarefa processa aproximadamente um "tamanho do bloco" de dados.
Para tarefas Spark SQL, o tamanho máximo da partição é controlado por
spark.sql.files.maxPartitionBytes
. Considere aumentá-lo para 1 GB:spark.sql.files.maxPartitionBytes=1073741824
.Para RDDs do Spark, o tamanho da partição é normalmente controlado com
fs.gs.block.size
, que tem um valor predefinido de 128 MB. Pondere aumentá-lo para 1 GB. Exemplo:--properties spark.hadoop.fs.gs.block.size=1073741824
Partições de saída
O número de tarefas nas fases subsequentes é controlado por várias propriedades. Em trabalhos maiores que processam mais de 1 TB, considere ter, pelo menos, 1 GB por partição.
Para o Spark SQL, o número de partições de saída é controlado por
spark.sql.shuffle.partitions
.Para tarefas do Spark que usam a API RDD, pode especificar o número de partições de saída ou definir
spark.default.parallelism
.
Ajuste aleatório para o ajuste aleatório do trabalhador principal
A propriedade mais significativa é --properties yarn:spark.shuffle.io.serverThreads=<num-threads>
.
Tenha em atenção que esta é uma propriedade YARN ao nível do cluster porque o servidor de mistura do Spark é executado como parte do Node Manager. Por predefinição, é o dobro (2x) do número de núcleos na máquina (por exemplo, 16 threads num n1-highmem-8). Se "Shuffle Read Blocked Time" for superior a 1 segundo e os trabalhadores principais não tiverem atingido os limites de rede, CPU ou disco, considere aumentar o número de threads do servidor de mistura.
Em tipos de máquinas maiores, considere aumentar spark.shuffle.io.numConnectionsPerPeer
, que tem o valor predefinido de 1. (Por exemplo, defina-o como 5 associações por par de anfitriões).
Aumentar tentativas
O número máximo de tentativas permitidas para apps principais, tarefas e fases pode ser configurado definindo as seguintes propriedades:
yarn:yarn.resourcemanager.am.max-attempts spark:spark.task.maxFailures spark:spark.stage.maxConsecutiveAttempts
Uma vez que as tarefas e os mestres de apps são terminados com maior frequência em clusters que usam muitas VMs preemptivas ou o dimensionamento automático sem desativação gradual, o aumento dos valores das propriedades anteriores nesses clusters pode ajudar (tenha em atenção que a utilização do EFM com o Spark e a desativação gradual não é suportada).
Desativação suave do YARN em clusters EFM
A desativação gradual do YARN pode ser usada para remover nós rapidamente com um impacto mínimo nas aplicações em execução. Para clusters de escala automática, o tempo limite de desativação gradual pode ser definido numa AutoscalingPolicy anexada ao cluster EFM.
Melhorias da EFM na desativação suave
Uma vez que os dados intermédios são armazenados num sistema de ficheiros distribuído, os nós podem ser removidos de um cluster EFM assim que todos os contentores em execução nesses nós terminarem. Em comparação, os nós não são removidos em clusters do Dataproc padrão até a aplicação terminar.
A remoção de nós não aguarda que os mestres de apps em execução num nó terminem. Quando o contentor principal da app é terminado, é reagendado noutro nó que não está a ser desativado. O progresso da tarefa não é perdido: o novo mestre da app recupera rapidamente o estado do mestre da app anterior lendo o histórico de tarefas.