Site icon TechVidvan

Apache Spark Performance Tuning : Learn How to Tune

In this tutorial, we will learn the basic concept of Apache Spark performance tuning. The process of tuning means to ensure the flawless performance of Spark. This process also guarantees to prevent bottlenecking of resources in Spark.

This blog covers complete details about Spark performance tuning or how to tune our Apache Spark jobs. We will study, spark data serialization libraries, java serialization & kryo serialization.

To learn in detail, we will focus data structure tuning and data locality. Also, includes garbage collection tuning and memory tuning to understand the topic better.

Ultimately, we will learn how the method of spark performance tuning ensures the good performance of the system.

Apache Spark Performance Tuning

Above picture, shows the key aspects of Performance Tuning in Apache Spark.

Introduction – Performance Tuning in Apache Spark

Tuning is a process of ensuring that how to make our Spark program execution efficient. Since, computations are in-memory, by any resource over the cluster, code may bottleneck. Resources like CPU, network bandwidth, or memory.

Generally, if data fits in memory so as a consequence bottleneck is network bandwidth. In meantime, to reduce memory usage we may also need to store spark RDDs in serialized form. Data serialization also results in good network performance also.

This is a method of adjusting settings to record for memory and instances used by the system. This method is termed as Tuning. Tuning guarantees optimal performance of Spark and prevents resource bottlenecking.

There may be good results of Spark performance tuning if done properly. likewise:

Spark Performance Tuning – Data Serialization

To optimize a Spark application, we should always start with data serialization. It plays a vital role in the performance of any distributed application.  Also, it is a most important key aspect of Apache Spark performance tuning.

There are formats which always slow down the computation. Formats such delays to serialize objects into or may consume a large number of bytes, we need to serialize them first. Spark offers a balance between convenience as well as performance.

Convenience means which allow us to work with any Java type in our operations. Spark supports two serialization libraries. Such as:

To understand better, let’s study each one by one in detail.

1. Java Serializat$ion

By using Java’s object output stream framework, Spark serializes the objects. By default,  to serialize objects, Spark uses Java’s framework.  Any class you create that implements java.io.Serializable, it can work with easily.

By extending java.io.Externalizable, can also control the performance of your serialization. This process is much flexible in nature. But, it seems to be very slow which leads to large serialized formats for many classes.

2. Kryo Serialization

Apart from Java serialization, Spark also uses Kryo library (version 2) to serialize. This process even serializes more quickly, kryo is exceptionally 10x faster and more compact than Java serialization.

As a consequence, it does not support all serializable types. It requires us to register the classes in advance, which we use in the program for best performance.

– we can switch to using kryo by initializing our job with a Sparkconf and calling

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

– while we register our own custom classes with Kryo, we need to use following method:

registerKryoClasses

– For example:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClassname1], classOf[MyClassname2]))
val sc = new SparkContext(conf)

If our objects are large, we may also need to increase the spark.kryoserializer.buffer config. To hold the largest object, we may serialize this value needs to be large enough.

Memory Tuning In Apache Spark Performance Tuning

While we tune memory usage, there are three considerations which strike:

As Java objects are fast to access, it may consume a factor of 2-5x more space than the “raw” data inside their fields. There can be various reasons behind this such as:

1. Data Structure Tuning

We can decrease the memory consumption by avoiding java features that may overhead. There are following possible ways such as:

2.  Garbage collection Tuning

When we have huge “churn” regarding RDDs stored by the program. Then JVM garbage collection becomes a huge problem. So, java evicts old objects to create space for new ones. It may trace through all our java objects and find the unused ones.

A major aspect is if we use data structures with fewer objects it greatly lowers this cost. For example, if we use an array of Ints instead of a linkedlist. As discussed earlier, a better method is to persist objects in the serialized form. So, now there will be only one object per  RDD partition.

– Measuring the Impact of GC

In GC tuning it is important to judge the time, that how often garbage collection occurs. We can do it by adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to the Java options. Afterwards, when our Spark job is run, a message printed in the worker’s logs. This will happen each time a garbage collection occurs.

Note:

These logs will be on our cluster’s worker nodes not on our driver program.

– Advance GC Tuning

To tune GC furthermore, we need to know the basic information about memory management in the JVM. Such as :

Spark Performance Tuning – Overview of Memory management

There are basically two categories where we use memory largely in Spark, such as storage and execution. Storage memory, which we use for caching & propagating internal data over the cluster. While, execution memory, we use for computation in shuffles, joins, sorts, and aggregations.

Storage and execution share a unified region in Spark which is denoted by ”M”. Storage can use all the available memory if no execution memory is used and vice versa. If total storage memory usage falls under a certain threshold “R”. So, execution may evict storage if necessary.

We can also say, R defines a sub-region within M where no cached blocks are evicted. Since there may be complexities in implementation storage may not evict execution.

– There are several properties we can get by this particular design. Such as:

In addition, this design offers reasonable out of the box performance for a variety of workloads. Even without any need of user expertise of how memory is divided internally.

In spite of the fact, there are two relevant configurations, So there is no need for the user to adjust them. As the default values are applicable to most workloads:

Spark Performance Tuning – Determining memory consumption

To calculate the amount of memory consumption, a dataset is must to create an RDD. Now, put RDD into the cache, and view the “Storage” page in the web UI. By using that page we can judge that how much memory that RDD is occupying.

Apart from it, if we want to estimate the memory consumption of a particular object. We can do it by using sizeEstimator’s estimate method. This method is helpful for experimenting with different layouts to trim memory usage.

It will also calculate the amount of space a broadcast variable occupy on each executor heap.

Spark Performance Tuning – Other Considerations

1. Level of Parallelism

Until we set the high level of parallelism for operations, Clusters will not be utilized. In Spark, it automatically set the number of “map” tasks to run on each file according to its size.

For distributed “reduce” operations it uses the largest parent RDD’s number of partitions. Distributed operations likewise groupByKey and reduceByKey.

We can also pass the level of parallelism as a second argument. To set the config property use spark.default.parallelism to change the default. Although, we approve 2-3 tasks per CPU core in our cluster.

2. Memory usage of Reduce Task

If working set of our tasks, like one of the reduce tasks in groupByKey, is too large, then it may show error. That error pop up the message OutOfMemoryError. Shuffle operations make a hash table within each task to form the grouping, which can often be large.

Shuffle operations can besortByKey, groupByKey, reduceByKey, join & many more. To make sure that each task’s input set is smaller, just need to increase the level of parallelism.

As we reuse one executor JVM across many tasks, it has low task launching cost. It means if tasks are short as 200 ms, spark supports it efficiently. Apparently, we can increase the level of parallelism to more than the number of cores in your clusters.

3. Broadcasting large variables

We can easily decrease the size of each serialized task. It is possible by using broadcast functionality available in sparkcontext. This also reduces the cost of launching a job over the cluster.

To judge the size of the serialized size of each task, we can follow the master. We can say the tasks which are greater than 20 KB are probably worth optimizing.

4. Data locality

For the performance of spark Job, Data locality implies major impact. It enhances the performance of spark jobs. Computations take place faster if data and code both operate together. If anyone of them is separated, one must move to other.

As code size is much smaller than data, it is faster to ship serialized code from place to place. Scheduling of spark builds around this basic principle of data locality.

In other words, Data locality means how close data is to the code processing it. On the basis of data’s current location, we have various levels of locality. According to order from closest to farthest, they are list-up below:

Conclusion

This tutorial is all about the main concerns about tuning. As we know spark performance tuning plays a vital role in spark. Importantly, spark performance tuning application-  data serialization and memory tuning.

There are several programs switching to Kryo serialization solves the big issue. Persisting data in serialized form will also solve most common performance issues.

Hence, the method of spark performance tuning ensures the good performance of the system.

Exit mobile version