Apache Spark Performance Tuning : Learn How to Tune
In this tutorial, we will learn the basic concept of Apache Spark performance tuning. The process of tuning means to ensure the flawless performance of Spark. This process also guarantees to prevent bottlenecking of resources in Spark.
This blog covers complete details about Spark performance tuning or how to tune our Apache Spark jobs. We will study, spark data serialization libraries, java serialization & kryo serialization. To learn in detail, we will focus data structure tuning and data locality. Also, includes garbage collection tuning and memory tuning to understand the topic better. Ultimately, we will learn how the method of spark performance tuning ensures the good performance of the system.
Above picture, shows the key aspects of Performance Tuning in Apache Spark.
2. Introduction – Performance Tuning in Apache Spark
Tuning is a process of ensuring that how to make our Spark program execution efficient. Since, computations are in-memory, by any resource over the cluster, code may bottleneck. Resources like CPU, network bandwidth, or memory. Generally, if data fits in memory so as a consequence bottleneck is network bandwidth. In meantime, to reduce memory usage we may also need to store spark RDDs in serialized form. Data serialization also results in good network performance also.
This is a method of adjusting settings to record for memory and instances used by the system. This method is termed as Tuning. Tuning guarantees optimal performance of Spark and prevents resource bottlenecking.
There may be good results of Spark performance tuning if done properly. likewise:
- By terminating jobs those run long.
- To ensure that jobs are on accurate execution engine.
- By using all resources in an effective manner.
- By enhancing performance time of system.
3. Spark Performance Tuning – Data Serialization
To optimize a Spark application, we should always start with data serialization. It plays a vital role in the performance of any distributed application. Also, it is a most important key aspect of Apache Spark performance tuning. There are formats which always slow down the computation. Formats such delays to serialize objects into or may consume a large number of bytes, we need to serialize them first. Spark offers a balance between convenience as well as performance. Convenience means which allow us to work with any Java type in our operations. Spark supports two serialization libraries. Such as:
- Java Serialization
- Kryo Serialization
To understand better, let’s study each one by one in detail.
3.1. Java Serialization
By using Java’s object output stream framework, Spark serializes the objects. By default, to serialize objects, Spark uses Java’s framework. Any class you create that implements java.io.Serializable, it can work with easily. By extending java.io.Externalizable, can also control the performance of your serialization. This process is much flexible in nature. But, it seems to be very slow which leads to large serialized formats for many classes.
3.2. Kryo Serialization
Apart from Java serialization, Spark also uses Kryo library (version 2) to serialize. This process even serializes more quickly, kryo is exceptionally 10x faster and more compact than Java serialization. As a consequence, it does not support all serializable types. It requires us to register the classes in advance, which we use in the program for best performance.
– we can switch to using kryo by initializing our job with a Sparkconf and calling
– while we register our own custom classes with Kryo, we need to use following method:
– For example:
val conf = new SparkConf().setMaster(...).setAppName(...) conf.registerKryoClasses(Array(classOf[MyClassname1], classOf[MyClassname2])) val sc = new SparkContext(conf)
If our objects are large, we may also need to increase the spark.kryoserializer.buffer config. To hold the largest object, we may serialize this value needs to be large enough.
4. Memory Tuning In Apache Spark Performance Tuning
While we tune memory usage, there are three considerations which strike:
- As the whole dataset needs to fit in memory, consideration of memory used by your objects is the must.
- As high turnover of objects, the overhead of garbage collection is necessary.
- We need to consider the cost of accessing those objects.
As Java objects are fast to access, it may consume a factor of 2-5x more space than the “raw” data inside their fields. There can be various reasons behind this such as:
- In Java strings, there are about 40 bytes of overhead over the raw string data. Due to string’s internal usage of UTF-16 encoding, it stores each character as two bytes. Apparently, a 10-character string may easily consume 60 bytes.
- Primitive types collections often store them as “boxed” objects. Like java.lang.Integer.
- Every Java object has an “object header” distinctly. This size is about 16 bytes and it contains information such as a pointer to its class. If there is an object which is very little data, this can be bigger than the data.
- Collection classes like HashMap and LinkedList use linked data structure. There we have “wrapper” object for every entry. In this object, it has a header and pointers (of size 8 bytes each) to the next object in the list.
4.1. Data Structure Tuning
We can decrease the memory consumption by avoiding java features that may overhead. There are following possible ways such as:
- For suppose RAM size is less than 32 GB, we can set JVM flag to –xx:+UseCompressedOops. This helps to create a pointer of four bytes instead of eight.
- By using lots of small objects and pointers we can avoid nested structures.
- We can use numeric IDs or enumerated objects rather than using strings for keys.
4.2. Garbage collection Tuning
When we have huge “churn” regarding RDDs stored by the program. Then JVM garbage collection becomes a huge problem. So, java evicts old objects to create space for new ones. It may trace through all our java objects and find the unused ones.
A major aspect is if we use data structures with fewer objects it greatly lowers this cost. For example, if we use an array of Ints instead of a linkedlist. As discussed earlier, a better method is to persist objects in the serialized form. So, now there will be only one object per RDD partition.
– Measuring the Impact of GC
In GC tuning it is important to judge the time, that how often garbage collection occurs. We can do it by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options. Afterwards, when our Spark job is run, a message printed in the worker’s logs. This will happen each time a garbage collection occurs.
These logs will be on our cluster’s worker nodes not on our driver program.
– Advance GC Tuning
To tune GC furthermore, we need to know the basic information about memory management in the JVM. Such as :
- There are two regions in which Java heap space is divided. Likewise, young and old. Old generation refers to objects with longer lifetimes, while young generation meant to hold short-lived objects.
- Afterwards, the young generation is also further divided into three regions, such as Eden, Survivor1 and Survivor2.
- In simple words, while Eden is full a minor GC is run on Eden. Now objects that are alive from Eden and Survivor1 are copied to Survivor2. All the survivor areas are swapped. Ultimately, If an object is very old or survivor2 is full, it is moved to old. So, while old is near to full, a full GC is invoked.
5. Spark Performance Tuning – Overview of Memory management
There are basically two categories where we use memory largely in Spark, such as storage and execution. Storage memory, which we use for caching & propagating internal data over the cluster. While, execution memory, we use for computation in shuffles, joins, sorts, and aggregations.
Storage and execution share a unified region in Spark which is denoted by ”M”. Storage can use all the available memory if no execution memory is used and vice versa. If total storage memory usage falls under a certain threshold “R”. So, execution may evict storage if necessary.
We can also say, R defines a sub-region within M where no cached blocks are evicted. Since there may be complexities in implementation storage may not evict execution.
– There are several properties we can get by this particular design. Such as:
- If the application is not using caching, it can use whole space for execution.
- If an application does use caching, it may retain a minimum storage space” R”. That place is for their data blocks where they are immune to being evicted.
In addition, this design offers reasonable out of the box performance for a variety of workloads. Even without any need of user expertise of how memory is divided internally.
In spite of the fact, there are two relevant configurations, So there is no need for the user to adjust them. As the default values are applicable to most workloads:
- As a fraction of the (JVM heap space – 300MB) (default 0.6), spark.memory.fraction expresses the size of M. For user data structure (40%) space is held, as internal metadata in Spark. If in case of any sparse and large records that space is also for safeguarding against OOM errors.
- memory.storagefraction display the size of R as a fraction of M (default 0.5). There is a place is for data blocks (R is the storage space within M) where they are immune to being evicted.
6. Spark Performance Tuning – Determining memory consumption
To calculate the amount of memory consumption, a dataset is must to create an RDD. Now, put RDD into the cache, and view the “Storage” page in the web UI. By using that page we can judge that how much memory that RDD is occupying.
Apart from it, if we want to estimate the memory consumption of a particular object. We can do it by using sizeEstimator’s estimate method. This method is helpful for experimenting with different layouts to trim memory usage. It will also calculate the amount of space a broadcast variable occupy on each executor heap.
7. Spark Performance Tuning – Other Considerations
7.1. Level of Parallelism
Until we set the high level of parallelism for operations, Clusters will not be utilized. In Spark, it automatically set the number of “map” tasks to run on each file according to its size. For distributed “reduce” operations it uses the largest parent RDD’s number of partitions. Distributed operations likewise groupByKey and reduceByKey.
We can also pass the level of parallelism as a second argument. To set the config property use spark.default.parallelism to change the default. Although, we approve 2-3 tasks per CPU core in our cluster.
7.2. Memory usage of Reduce Task
If working set of our tasks, like one of the reduce tasks in groupByKey, is too large, then it may show error. That error pop up the message OutOfMemoryError. Shuffle operations make a hash table within each task to form the grouping, which can often be large. Shuffle operations can besortByKey, groupByKey, reduceByKey, join & many more. To make sure that each task’s input set is smaller, just need to increase the level of parallelism.
As we reuse one executor JVM across many tasks, it has low task launching cost. It means if tasks are short as 200 ms, spark supports it efficiently. Apparently, we can increase the level of parallelism to more than the number of cores in your clusters.
7.3. Broadcasting large variables
We can easily decrease the size of each serialized task. It is possible by using broadcast functionality available in sparkcontext. This also reduces the cost of launching a job over the cluster. To judge the size of the serialized size of each task, we can follow the master. We can say the tasks which are greater than 20 KB are probably worth optimizing.
7.4. Data locality
For the performance of spark Job, Data locality implies major impact. It enhances the performance of spark jobs. Computations take place faster if data and code both operate together. If anyone of them is separated, one must move to other. As code size is much smaller than data, it is faster to ship serialized code from place to place. Scheduling of spark builds around this basic principle of data locality.
In other words, Data locality means how close data is to the code processing it. On the basis of data’s current location, we have various levels of locality. According to order from closest to farthest, they are list-up below:
- If PROCESS_LOCAL data is in the same JVM as the running code that is the best possible locality.
- As data travel between processes is quite slower than PROCESS_LOCAL. It is must that NODE_LOCAL data is on the same node.
- We can access NO_PREF equally faster from anywhere. Therefore, it has no locality preference.
- Due to data needs to be sent over the network, as data is on a different server on the same rack. It should be done in single switch also. So RACK_LOCAL data is on the same rack of servers.
- ANY data retain anywhere else on the network and not in the same rack.
8. Spark Performance Tuning – Conclusion
This tutorial is all about the main concerns about tuning. As we know spark performance tuning plays a vital role in spark. Importantly, spark performance tuning application- data serialization and memory tuning. There are several programs switching to Kryo serialization solves the big issue. Persisting data in serialized form will also solve most common performance issues. Hence, the method of spark performance tuning ensures the good performance of the system.
Reference – Apache Spark
Leave a comment.