Based on what I have read and experimented so far, I have provided some key factors that can impact the performance of Apache Spark applications, specifically spark streaming. Concurrency and memory consumption are two key areas that needs careful attention when tuning for performance. The below tips mainly address improving concurrency and reducing memory consumption of spark applications.
Partitions and Concurrency
A spark job consists of several transformations which is broken down into stages that form a pipeline.In the case of Spark Streaming, as data comes in, it is collected, buffered and packaged in blocks during a given time interval - this interval is commonly referred to as "batch interval". When this interval elapses, the collected data is sent to Spark for processing.The data is stored in blocks - these blocks become the RDD partitions. So, the number of tasks that will be scheduled for micro-batch can be expressed as below:
number of tasks = (number of stages) * (number of partitions)
From the above expression, we can see that reducing the number of partitions can have a direct impact on the number of tasks that will be scheduled for computation. However, having too few partitions can lead to less concurrency, which can cause the task to take longer to complete. In addition, having fewer partitions can lead to higher likelihood for data skewness. Thus, the number of partitions can neither be too small nor too high - it has to be balanced. A good lower bound for number of partitions is to have atleast 2 * (# cores in cluster). For example, if you have 100 cores in the clusters, a good starting point for number of partitions is around 200. From there on, you can continue to keep increasing the number of partitions until you can see that you get a good balance of concurrency and task execution time.
Batch Interval and Block Interval
As mentioned earlier, the batch interval refers to the time interval during which the data is collected, buffered by the receiver to be sent to Spark. The receiver sends the data to the executor which manages the data in blocks and each block becomes a partition of the RDD produced during each batch interval. Now, the number of partitions created in each block interval per consumer is based on the block interval - which is the interval at which data received by Spark Streaming receivers is formed into blocks of data. This interval can be set with this property - spark.streaming.blockInterval.
So, the number of partitions created per consumer can be calculated with this expression:
number of partitions per consumer = batchInterval / blockInterval
Now, the total number of partitions per job is
number of partitions per application = (#consumers) * (batchInterval / blockInterval)
Memory Consumption
This is another important factor that I have seen that can really impact the performance of the application. It is very important to understand the memory consumption of our dataset used for computing and also the cost associated with accessing and disposing the dataset.
Default Executor Memory
By default, the amount of memory to use per executor process is only 512MB which might not work for most applications dealing with large datasets. This memory is set based on this config property (spark.executor.memory). This property setting can be configured in the properties file (default is spark-defaults.conf) or by supplying configuration setting at runtime in the spark-submit script like below:
./bin/spark-submit --executor-memory 4g
Data Serialization
Because
of the in-memory nature of most Spark computations, Serialization plays an
important role in the performance of the application. If the data formats that are used in the application are too slow to
serialize into objects, it will greatly slow
down the computational performance of the application. Spark by default has “Java Serialization” which is very
flexible and works with most classes but it is also very slow. We use the Kryo
Serialization which uses Kryo
library which is very compact and faster than Java Serialization
but it does not support all Serializable
types.
You can switch to using Kryo by initializing your job with a SparkConf object.
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
This setting
configures the serializer used for not only shuffling data between worker nodes
but also when serializing RDDs to disk. Another requirement for Kryo serializer
is to register the classes in advance for best performance. If the classes are not registered, then the kryo would store the full class name with each object (instead of mapping with an ID), which can lead to wasted resource.
conf.set("spark.kryo.registrator", “com.art.spark.AvroKyroRegistrator");
The
scala
snippet above shows how to register the avro classes. This will register the use of
Avro's specific binary serialization for the Clickstream class