The following sections provide tips to help you fine tune your Dataproc Spark applications.
Use ephemeral clusters
When you use the Dataproc "ephemeral" cluster model, you create a dedicated cluster for each job, and when the job finishes, you delete the cluster. With the ephemeral model, you can treat storage and compute separately, saving job input and output data in Cloud Storage or BigQuery, using the cluster for compute and temporary data storage only.
Persistent cluster pitfalls
Using ephemeral one-job clusters avoids the following pitfalls and potential problems associated with using shared and long-running "persistent" clusters:
- Single points of failure: a shared cluster error state can cause all jobs to fail, blocking an entire data pipeline. Investigating and recovering from an error can take hours. Since ephemeral clusters keep temporary in-cluster states only, when an error occurs, they can be quickly deleted and recreated.
- Difficulty maintaining and migrating cluster states in HDFS, MySQL or local filesystems
- Resource contentions among jobs that negatively affect SLOs
- Unresponsive service daemons caused by memory pressure
- Buildup of logs and temporary files that can exceed disk capacity
- Upscaling failure due to cluster zone stockout
- Lack of support for outdated cluster image versions.
Ephemeral cluster benefits
On the positive side, ephemeral clusters let you do the following:
- Configure different IAM permissions for different jobs with different Dataproc VM service accounts.
- Optimize a cluster's hardware and software configurations for each job, changing cluster configurations as needed.
- Upgrade image versions in new clusters to get the latest security patches, bug fixes, and optimizations.
- Troubleshoot issues more quickly on an isolated, single-job cluster.
- Save costs by paying for ephemeral cluster running time only, not for idle time between jobs on a shared cluster.
Use Spark SQL
The Spark SQL DataFrame API is a significant optimization of the RDD API. If you interact with code that uses RDDs, consider reading data as a DataFrame before passing an RDD in the code. In Java or Scala code, consider using the Spark SQL Dataset API as a superset of RDDs and DataFrames.
Use Apache Spark 3
Dataproc 2.0 installs Spark 3, which includes the following features and performance improvements:
- GPU support
- Ability to read binary files
- Performance improvements
- Dynamic Partition Pruning
- Adaptive query execution, which optimizes Spark jobs in real time
Use Dynamic Allocation
Apache Spark includes a Dynamic Allocation feature that scales
the number of Spark executors on workers within a cluster. This feature allows
a job to use the full Dataproc cluster even when the cluster
scales up. This feature is enabled by default on Dataproc
(spark.dynamicAllocation.enabled is set to true). See
Spark Dynamic Allocation
for more information.
Use Dataproc autoscaling
Dataproc Autoscaling dynamically adds and removes Dataproc workers from a cluster to help ensure that Spark jobs have the resources needed to complete quickly.
It is a best practice to configure the autoscaling policy to only scale secondary workers.
Use Dataproc Enhanced Flexibility Mode
Clusters with preemptible VMs or an autoscaling policy may receive FetchFailed exceptions when workers are preempted or removed before they finish serving shuffle data to reducers. This exception can cause task retries and longer job completion times.
Recommendation: Use Dataproc Enhanced Flexibility Mode, which does not store intermediate shuffle data on secondary workers, so that secondary workers can be safely preempted or scaled down.
Configure partitioning and shuffling
Spark stores data in temporary partitions on the cluster. If your application groups or joins DataFrames, it shuffles the data into new partitions according to the grouping and low-level configuration.
Data partitioning significantly impacts application performance: too few partitions limits job parallelism and cluster resource utilization; too many partitions slows down the job due to additional partition processing and shuffling.
Configuring partitions
The following properties govern the number and size of your partitions:
- spark.sql.files.maxPartitionBytes: the maximum size of partitions when you read in data from Cloud Storage. The default is 128 MB, which is sufficiently large for most applications that process less than 100 TB.
- spark.sql.shuffle.partitions: the number of partitions after performing a shuffle. The default is- 1000for- 2.2and later image version clusters. Recommendation: Set this to 3x the number of vCPUs in your cluster.
- spark.default.parallelism: the number of partitions returned after performing RDD transformations that require shuffles, such as- join,- reduceByKeyand- parallelize. The default is the total number of vCPUs in your cluster. When using RDDs in Spark jobs, you can set this number to 3x your vCPUs
Limit the number of files
There is a performance loss when Spark reads a large number small files. Store data in larger file sizes, for example, file sizes in the 256MB–512MB range. Similarly, limit the number of output files (to force a shuffle, see Avoid unnecessary shuffles).
Configure adaptive query execution (Spark 3)
Adaptive query execution (enabled by default in Dataproc image version 2.0) provides Spark job performance improvements, including:
- Coalescing partitions after shuffles
- Converting sort-merge joins to broadcast joins
- Optimizations for skew joins.
Although the default configuration settings are sound for most use cases,
setting spark.sql.adaptive.advisoryPartitionSizeInBytes to
spark.sqlfiles.maxPartitionBytes (default 128 MB) can be beneficial.
Avoid unnecessary shuffles
Spark allows users to manually trigger a shuffle to re-balance their data with
the repartition function. Shuffles are expensive, so reshuffling data should
be used cautiously. Setting the partition configurations
appropriately should be sufficient to allow Spark to automatically partition
your data.
Exception: When writing column-partitioned data to Cloud Storage, repartitioning on a specific column avoids writing many small files to achieve faster write times.
df.repartition("col_name").write().partitionBy("col_name").save("gs://...")
Store data in Parquet or Avro
Spark SQL defaults to reading and writing data in Snappy compressed Parquet files. Parquet is in efficient columnar file format that enables Spark to only read the data it needs to execute an application. This is an important advantage when working with large datasets. Other columnar formats, such as Apache ORC, also perform well.
For non-columnar data, Apache Avro provides an efficient binary-row file format. Although typically slower than Parquet, Avro's performance is better than text based formats,such as CSV or JSON.
Optimize disk size
Persistent disks throughput scales with disk size, which can affect the Spark job performance since jobs write metadata and shuffle data to disk. When using standard persistent disks, disk size should be at least 1 terabyte per worker (see Performance by persistent disk size).
To monitor worker disk throughput in the Google Cloud console:
- Click the cluster name on the Clusters page.
- Click the VM INSTANCES tab.
- Click on any worker name.
- Click the MONITORING tab, then scroll down to Disk Throughput to view worker throughput.
Disk considerations
Ephemeral Dataproc clusters, which don't benefit from persistent storage. can use local SSDs. Local SSDs are physically attached to the cluster and provide higher throughput than persistent disks (see the Performance table). Local SSDs are available at a fixed size of 375 gigabytes, but you can add multiple SSDs to increase performance.
Local SSDs don't persist data after a cluster is shut down. If you need persistent storage, you can use SSD persistent disks, which provide higher throughput for their size than standard persistent disks. SSD persistent disks are also a good choice if partition size will be smaller than 8 KB (however, avoid small paritiions).
Attach GPUs to your cluster
Spark 3 supports GPUs. Use GPUs with the RAPIDS initialization action to speed up Spark jobs using the RAPIDS SQL Accelerator. The GPU driver initialization action to configure a cluster with GPUs.
Common job failures and fixes
Out of Memory
Examples:
- "Lost executor"
- "java.lang.OutOfMemoryError: GC overhead limit exceeded"
- "Container killed by YARN for exceeding memory limits"
Possible fixes:
- If using PySpark, raise spark.executor.memoryOverheadand lowerspark.executor.memory.
- Use high memory machine types.
- Use smaller partitions.
Shuffle Fetch Failures
Examples:
- "FetchFailedException" (Spark error)
- "Failed to connect to..." (Spark error)
- "Failed to fetch" (MapReduce error)
Typically caused by by premature removal of workers which still have shuffle data to serve.
Possible causes and fixes:
- Preemptible worker VMs were reclaimed or non-preemptible worker VMs were removed by the autoscaler. Solution: Use Enhanced Flexibility Mode to make secondary workers safely preemptible or scalable.
- Executor or mapper crashed due to OutOfMemory error. Solution: increase the memory of executor or mapper.
- The Spark shuffle service may be overloaded. Solution: decrease the number of job partitions.
YARN nodes are UNHEALTHY
Examples (from YARN logs):
...reported UNHEALTHY with details: 1/1 local-dirs usable space is below
configured utilization percentage/no more usable space
[ /hadoop/yarn/nm-local-dir : used space above threshold of 90.0% ]
Often related to insufficient disk space for shuffle data. Diagnose by viewing log files:
- Open your project's Clusters page in the Google Cloud console, then click the cluster's name.
- Click VIEW LOGS.
- Filter logs by hadoop-yarn-nodemanager.
- Search for "UNHEALTHY".
Possible Fixes:
- The user cache is stored in the directory specified by the
yarn.nodemanager.local-dirsproperty in theyarn-site.xml file. This file is located at/etc/hadoop/conf/yarn-site.xml. You can check the free space in the/hadoop/yarn/nm-local-dirpath, and free up space by deleting the/hadoop/yarn/nm-local-dir/usercacheuser cache folder.
- If the log reports "UNHEALTHY" status, recreate your cluster with larger disk space, which will increase the throughput cap.
Job fails due to insufficient driver memory
When running jobs in cluster mode, the job fails if the memory size of the worker node memory size.
Example from driver logs:
'Exception in thread "main" java.lang.IllegalArgumentException: Required AM memory (32768+3276 MB) is above the max threshold (12288 MB) of this cluster! Please check the values of 'yarn.scheduler.maximum -allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.'
Possible Fixes:
- Set spark:spark.driver.memoryless thanyarn:yarn.scheduler.maximum-allocation-mb.
- Use the same machine type for master and worker nodes.
What's next
- Learn more about Spark performance tuning.