Spark Shared Variable- Broadcast and Accumulators

1. Objective

Basically, there is a pretty simple concept of a Spark Shared variable. In simple words, these are variables those we want to share throughout our cluster. In this blog, we completely focus on Shared Variable in spark, two different types of Shared Variables in spark such as Broadcast Variable and Accumulator. To understand each in detail, we will explain both with examples.

Spark Shared Variable

Introduction of Shared Variable

2. What is Shared Variable in Spark

Generally, while functions passed on, it executes on the specific remote cluster node. Usually, it works on separate copies of all the variables those we use in functions. These specific variables are precisely copied to each machine. Also, on the remote machine, no updates to the variables sent back to the driver program. Therefore, it would be inefficient to support general, read-write shared variables across tasks. Although, in spark  for two common usage patterns, there are two types of shared variables, such as:

Shared Variable - Types

Types of Shared Variable

  1.  Broadcast Variables
  2.  Accumulators

Now let’s discuss each of them in detail:

2.1. Broadcast Variables in Spark

Generally, variables allow the programmers to keep a read-only variable cached on each machine. Broadcast Variables despite shipping a copy of it with tasks. We can use them, for example, to give a copy of a large input dataset in an efficient manner to every node. In Spark, by using efficient algorithms it is possible to distribute broadcast variables. It helps to reduce communication cost.

Through a set of stages, separated by distributed “shuffle” operations, actions execute. Spark can broadcast the common data automatically, needed by tasks within each stage. The data broadcasted this way then cached in serialized form and also deserialized before running each task. Hence, creating broadcast variables explicitly is useful in some cases, like while tasks across multiple stages need the same data. While caching the data in the deserialized form is important.

We can create Spark broadcast variables from a variable v. For that, we need to call SparkContext.broadcast(v) method. This variable is a wrapper around v. Also, by calling the value method we can access its value.

For Example:

scala> val broadcastVar1 = sc.broadcast(Array(1, 2, 3))
broadcastVar1: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

scala> broadcastVar1.value
res0: Array[Int] = Array(1, 2, 3)

After we create a broadcast variable, instead of using value v in any functions we should use it. By ensuring that we can not ship v  to the nodes more than once. It is also very important that no modification can take place on the object v after it is broadcast. It will help ensure that all nodes get the same value of the broadcast variable.

2.2. Accumulators

The variables which are only “added” through a commutative and associative operation. Also, can efficiently support in parallel. We can use Accumulators to implement counters or sums. Spark natively supports programmers for new types and accumulators of numeric types.

We can also create named or unnamed accumulators, as a user. As similar in below image, In the web UI, it displays a named accumulator. For each accumulator modified by a task in the “Tasks” table Spark displays the value.

To understand the progress of running stages, tracking accumulators in UI is useful.

By calling SparkContext.longAccumulator(), we can create a numeric accumulator and by SparkContext.doubleAccumulator(), we can accumulate values of type long or double. Afterwards, by using the add method tasks running on a cluster can add to it. Nevertheless, they cannot read its value. By using its value method, only the driver program can read the accumulator value.

For Example:

In this code we are using an accumulator to add up the elements of an array:

scala> val accum1 = sc.longAccumulator("Accumulator1")
accum1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(Accumulator1), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum1.value
res2: Long = 10

As actions performed to update, we can only update each task’s to the accumulator once. In other words, restarting of tasks will not update the value. While, can apply task’s update many times in transformations, if it re-executes all tasks.

Accumulators can not change the Spark lazy evaluation model. If we try to update within an operation on a spark RDD, their value updates, as it computes RDD as part of an action. Accordingly, for accumulator updates, there is no guarantee that it executes. Since made within a lazy transformation.

4. Conclusion

Hence, by this article, we have seen how in Spark can discover two methods of objects sharing. Broadcast variable concerns read-only data, that can be copied before the first transformation on each executor node also, cached there and used for further computations. Afterwards, we have seen how accumulators help to handle shared objects. Hopefully, through this article, you have understood the concept of shared variables.

You want to learn Spark, click here to get the list of best books of Spark.

Reference

If this blog is informative, write us in comment section.

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *