Procesamiento paralelo

Las canalizaciones se ejecutan en clústeres de máquinas. Logran un alto rendimiento dividiendo el trabajo que se debe realizar y, luego, ejecutándolo en paralelo en los múltiples ejecutores distribuidos por todo el clúster. En general, cuanto mayor sea la cantidad de divisiones (también llamadas particiones), más rápido podrá ejecutarse la canalización. El nivel de paralelismo en tu canalización está determinado por las fuentes y las etapas de redistribución de la canalización.

Fuentes

Al comienzo de cada ejecución de la canalización, cada fuente de la canalización calcula qué datos se deben leer y cómo se pueden dividir esos datos en divisiones. Por ejemplo, considera una canalización básica que lee datos de Cloud Storage, realiza algunas transformaciones de Wrangler y, luego, vuelve a escribir en Cloud Storage.

Canalización básica que muestra la fuente de Cloud Storage, la transformación de Wrangler y el receptor de Cloud Storage

Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en segmentos según el tamaño de los archivos. Por ejemplo, un archivo de un gigabyte se puede dividir en 100 partes, cada una de 10 MB. Cada ejecutor lee los datos de esa división, ejecuta las transformaciones de Wrangler y, luego, escribe el resultado en un archivo part.

Datos particionados en Cloud Storage en transformaciones paralelas de Wrangler en archivos de partes

Si tu canalización se ejecuta con lentitud, una de las primeras cosas que debes verificar es si tus fuentes crean suficientes divisiones para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto sin formato no se puedan dividir. Si lees archivos comprimidos con gzip, es posible que notes que tu canalización se ejecuta mucho más lento que si leyeras archivos sin comprimir o archivos comprimidos con BZIP (que se puede dividir). Del mismo modo, si usas la fuente de la base de datos y la configuraste para que use solo una división, se ejecutará mucho más lento que si la configuras para que use más divisiones.

Shuffle

Algunos tipos de complementos hacen que los datos se mezclen en todo el clúster. Esto sucede cuando los registros que procesa un ejecutor deben enviarse a otro para realizar el cálculo. Las redistribuciones son operaciones costosas porque implican una gran cantidad de E/S. Los complementos que provocan la reorganización de los datos aparecen en la sección Analytics de Pipeline Studio. Estos incluyen complementos, como Group By, Deduplicate, Distinct y Joiner. Por ejemplo, supongamos que se agrega una etapa Group By a la canalización del ejemplo anterior.

Supongamos también que los datos que se leen representan las compras realizadas en una tienda de comestibles. Cada registro contiene un campo item y un campo num_purchased. En la etapa Group By, configuramos la canalización para agrupar registros en el campo item y calcular la suma del campo num_purchased.

Cuando se ejecuta la canalización, los archivos de entrada se dividen como se describió anteriormente. Después de eso, cada registro se redistribuye en el clúster de modo que cada registro con el mismo elemento pertenezca al mismo ejecutor.

Como se ilustra en el ejemplo anterior, los registros de compras de Apple se distribuían originalmente en varios ejecutores. Para realizar la agregación, todos esos registros debían enviarse a través del clúster al mismo ejecutor.

La mayoría de los complementos que requieren una aleatorización te permiten especificar la cantidad de particiones que se usarán cuando se aleatoricen los datos. Este parámetro controla la cantidad de ejecutores que se usan para procesar los datos aleatorizados.

En el ejemplo anterior, si la cantidad de particiones se establece en 2, cada ejecutor calcula los agregados para dos elementos en lugar de uno.

Ten en cuenta que es posible disminuir el paralelismo de tu canalización después de esa etapa. Por ejemplo, considera la vista lógica de la canalización:

Si la fuente divide los datos en 500 particiones, pero la función Group By los aleatoriza con 200 particiones, el nivel máximo de paralelismo después de Group By se reduce de 500 a 200. En lugar de 500 archivos de partes diferentes escritos en Cloud Storage, solo tienes 200.

Cómo elegir particiones

Si la cantidad de particiones es demasiado baja, no usarás toda la capacidad de tu clúster para paralelizar la mayor cantidad de trabajo posible. Si se configuran las particiones en un valor demasiado alto, aumenta la cantidad de sobrecarga innecesaria. En general, es mejor usar demasiadas particiones que muy pocas. La sobrecarga adicional es algo de lo que debes preocuparte si tu canalización tarda unos minutos en ejecutarse y estás tratando de reducir un par de minutos. Si tu canalización tarda horas en ejecutarse, por lo general, no debes preocuparte por la sobrecarga.

Una forma útil, pero demasiado simplista, de determinar la cantidad de particiones que se usarán es establecerla en max(cluster CPUs, input records / 500,000). En otras palabras, toma la cantidad de registros de entrada y divídela por 500,000. Si esa cantidad es mayor que la cantidad de CPU del clúster, úsala para la cantidad de particiones. De lo contrario, usa la cantidad de CPU del clúster. Por ejemplo, si tu clúster tiene 100 CPUs y se espera que la etapa de combinación tenga 100 millones de registros de entrada, usa 200 particiones.

Una respuesta más completa es que las redistribuciones funcionan mejor cuando los datos intermedios de redistribución de cada partición pueden caber por completo en la memoria de un ejecutor, de modo que no se necesite volcar nada en el disco. Spark reserva poco menos del 30% de la memoria de un ejecutor para almacenar datos de reorganización. La cantidad exacta es (memoria total - 300 MB) * 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, eso significa que cada partición no debería contener más de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime a 1 KB de tamaño, eso significa que (500 MB / partición) / (1 KB/registro) = 500,000 registros por partición. Si tus ejecutores usan más memoria o tus registros son más pequeños, puedes ajustar este número según corresponda.

Sesgo de datos

Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de manera uniforme. Es decir, hubo tres compras de cada uno de los siguientes productos: manzanas, bananas, zanahorias y huevos. El intercambio en una clave distribuida de forma uniforme es el tipo de intercambio con mejor rendimiento, pero muchos conjuntos de datos no tienen esta propiedad. Si continuamos con el ejemplo anterior de la compra en el supermercado, esperarías tener muchas más compras de huevos que de tarjetas de boda. Cuando hay algunas claves de aleatorización que son mucho más comunes que otras, se trata de datos sesgados. Los datos sesgados pueden tener un rendimiento significativamente peor que los datos no sesgados, ya que una cantidad desproporcionada de trabajo se realiza con una pequeña cantidad de ejecutores. Esto hace que un pequeño subconjunto de particiones sea mucho más grande que todas las demás.

En este ejemplo, hay cinco veces más compras de huevos que de tarjetas, lo que significa que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. No importa mucho cuando se trata de solo 10 registros, en lugar de dos, pero sí hace una gran diferencia cuando se trata de cinco mil millones de registros en lugar de mil millones. Cuando tienes una distribución asimétrica de los datos, la cantidad de particiones que se usan en un proceso de Shuffle no tiene un gran impacto en el rendimiento de la canalización.

Puedes reconocer la asimetría de los datos si examinas el gráfico de registros de salida a lo largo del tiempo. Si la etapa genera registros a un ritmo mucho más alto al inicio de la ejecución de la canalización y, luego, se ralentiza de repente, es posible que tengas datos sesgados.

También puedes reconocer el sesgo de datos examinando el uso de la memoria del clúster a lo largo del tiempo. Si tu clúster tiene capacidad completa durante un tiempo, pero, de repente, tiene un uso de memoria bajo durante un período, esto también es un signo de que estás lidiando con una asimetría de datos.

Los datos sesgados afectan el rendimiento de manera más significativa cuando se realiza una unión. Existen algunas técnicas que se pueden usar para mejorar el rendimiento de las uniones sesgadas. Para obtener más información, consulta Procesamiento paralelo para operaciones de JOIN.

Ajuste adaptable para la ejecución

Para ajustar la ejecución de forma adaptativa, especifica el rango de particiones que se usará, no el número exacto de la partición. El número de partición exacto, incluso si se configura en la canalización, se ignora cuando se habilita la ejecución adaptable.

Si usas un clúster efímero de Managed Service for Apache Spark, Cloud Data Fusion establece la configuración adecuada de forma automática, pero para los clústeres estáticos de Managed Service for Apache Spark o Hadoop, se pueden establecer los siguientes dos parámetros de configuración:

  • spark.default.parallelism: Se debe establecer en la cantidad total de vCores disponibles en el clúster. Esto garantiza que tu clúster no esté subcargado y define el límite inferior para la cantidad de particiones.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Configúralo en 32 veces la cantidad de vCores disponibles en el clúster. Define el límite superior para la cantidad de particiones.
  • Spark.sql.adaptive.enabled: Para habilitar las optimizaciones, establece este valor en true. El servicio administrado para Apache Spark lo establece automáticamente, pero si usas clústeres genéricos de Hadoop, debes asegurarte de que esté habilitado .

Estos parámetros se pueden configurar en la configuración del motor de una canalización específica o en las propiedades del clúster de un clúster estático de Managed Service for Apache Spark.

¿Qué sigue?