Monday, April 13, 2015

Performance Tuning Tips for Running Spark Applications

Based on what I have read and experimented so far, I have provided some key factors that can impact the performance of Apache Spark applications, specifically spark streamingConcurrency and memory consumption are two key areas that needs careful attention when tuning for performance. The below tips mainly address improving concurrency and reducing memory consumption of spark applications.

Partitions and Concurrency

A spark job consists of several transformations which is broken down into stages that form a pipeline.In the case of Spark Streaming, as data comes in, it is collected, buffered and packaged in blocks during a given time interval - this interval is commonly referred to as "batch interval". When this interval elapses, the collected data is sent to Spark for processing.The data is stored in blocks - these blocks become the RDD partitions. So, the number of tasks that will be scheduled for micro-batch can be expressed as below:

number of tasks = (number of stages) * (number of partitions)



From the above expression, we can see that reducing the number of partitions can have a direct impact on the number of tasks that will be scheduled for computation. However, having too few partitions can lead to less concurrency, which can cause the task to take longer to complete. In addition, having fewer partitions can lead to higher likelihood for data skewness. Thus, the number of partitions can neither be too small nor too high - it has to be balanced. A good lower bound for number of partitions is to have atleast 2 * (# cores in cluster). For example, if you have 100 cores in the clusters, a good starting point for number of partitions is around 200. From there on, you can continue to keep increasing the number of partitions until you can see that you get a good balance of concurrency and task execution time.

Batch Interval and Block Interval

As mentioned earlier, the batch interval refers to the time interval during which the data is collected, buffered by the receiver to be sent to Spark. The receiver sends the data to the executor which manages the data in blocks and each block becomes a partition of the RDD produced during each batch interval.  Now, the number of partitions created in each block interval per consumer is based on the block interval - which is the interval at which data received by Spark Streaming receivers is formed into blocks of data. This interval can be set with this property - spark.streaming.blockInterval.


So, the number of partitions created per consumer can be calculated with this expression:


number of partitions per consumer = batchInterval / blockInterval

Now, the total number of partitions per job is

number of partitions per application = (#consumers) * (batchInterval / blockInterval)

Memory Consumption

This is another important factor that I have seen that can really impact the performance of the application. It is very important to understand the memory consumption of our dataset used for computing and also the cost associated with accessing and disposing the dataset. 

Default Executor Memory

By default, the amount of memory to use per executor process is only 512MB which might not work for most applications dealing with large datasets. This memory is set based on this config property (spark.executor.memory). This property setting can be configured in the properties file (default is spark-defaults.conf) or by supplying configuration setting at runtime in the spark-submit script like below:


./bin/spark-submit --executor-memory 4g


Data Serialization


Because of the in-memory nature of most Spark computations, Serialization plays an important role in the performance of the application. If the data formats that are used in the application are too slow to serialize into objects, it will greatly slow down the computational performance of the application. Spark by default has “Java Serialization” which is very flexible and works with most classes but it is also very slow. We use the Kryo Serialization which uses Kryo library which is very compact and faster than Java Serialization but it does not support all Serializable types. 

You can switch to using Kryo by initializing your job with a SparkConf object. 

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

This setting configures the serializer used for not only shuffling data between worker nodes but also when serializing RDDs to disk. Another requirement for Kryo serializer is to register the classes in advance for best performance. If the classes are not registered, then the kryo would store the full class name with each object (instead of mapping with an ID), which can lead to wasted resource.

conf.set("spark.kryo.registrator", com.art.spark.AvroKyroRegistrator");

Spark Kryo Serialization

The scala snippet above shows how to register the avro classes. This will register the use of Avro's specific binary serialization for the Clickstream class