Stateful Transformations in Spark Streaming

As we know, there are various modules available in Apache Spark. Each module is serving different purposes, streaming API is one of its powerful modules.  It provides power to the developer to work with a continuous stream, offers to work under abstraction called Discretized Stream, or DStream.

It also attains a wonderful property called Apache spark streaming Stateful Transformations. This tutorial focuses on a particular property of spark streaming, “Stateful Transformations API”.

But before Stateful Transformations, we will briefly introduce spark streaming, checkpointing with stateful streaming, key-value pair and stateful transformation methods mapWithState and updateStateByKey in detail.

apache spark stateful transformations

Stateful Transformation With Windowing In Spark Streaming

What is Spark Streaming

In the year 2013, Apache Spark introduces spark streaming. Basically, spark APIs core extension, offers fault-tolerant stream processing of live data streams provides scalable, high-throughput processing.

Data ingestion is possible from many sources, for example, Kafka, apache flume, amazon kinesis or TCP sockets. Processing is possible from complex algorithms those are expressed with high-level functions, for example, map, reduce, join and window.

After all, processed data may push out to filesystems, as well as live dashboards.

In addition, its key abstraction is spark DStream. Also, represents a stream of data categorized into small batches. DStreams allows integrating with any other Apache spark components, for example, Spark MLlib and Spark SQL.

What is Stateful Transformation

Stateful transformation a particular property of spark streaming, it enables us to maintain state between micro batches. In other words, it maintains the state across a period of time, can be as long as an entire session of streaming jobs. Also, allow us to form sessionization of our data. This is achieved by creating checkpoints on streaming applications, though windowing may not require checkpoints, other operations on windowing may.

In other words, these are operations on DStreams that track data across time. It defines, uses some data from the previous batch to generate the results for a new batch.

Checkpointing for Stateful Streaming

Through, mechanism of checkpointing, frameworks guarantees fault tolerance throughout. Even for the complete lifespan of our spark job. Spark’s ensures a clean way to recovery, while we operate 24/7.

Through checkpointing, things will be directly under our control, even a network failure or data center crashes. If we use the checkpoint, it will be done every interval of our choosing to a persistent data store, for example, Amazon S3, HDFS or Azure Blob Storage.

Checkpointing is a feature for any non-stateful transformation. But for stateful streams, it is mandatory that you provide a checkpointing directory, else we won’t be able to start our application.

To provide a checkpoint directory is very easy, it is as similar as calling the StreamingContext with the directory location:

val sparkContext = new SparkContext()
val ssc = new StreamingContext(sparkContext, Duration(4000))
ssc.checkpoint("path/to/persistent/storage")

The important thing is as long as we haven’t modified existing code, checkpointed data is usable. Also, suitable to recover from job failure. If we modify our code for any reason, the checkpointed data is no longer compatible and must be deleted in order for your job to be able to start.

Stateful Transformation – Key-value pairs 

It is a requirement of stateful transformations, that we operate on a DStream. In the form of DStream[(K, V)], DStream encapsulates a key-value pair. Here, K signifies the type of the key while V signifies the type the value.

Basically, it offers spark to shuffle data based on the key. Thus, all data for a given key can be available on the same worker node. Also, allows us to do meaningful aggregations.

Stateful Transformations methods in Spark Streaming (updateStateByKey and mapWithState)

a. updateStateByKey:

Before Spark 1.6.0, stateful transformation available was like this.

PairDStreamFunctions.updateStateByKey.

And simplest form signature looks like this:

def updateStateByKey[S](updateFunc: (Seq[V], Option[S]) ⇒ Option[S])

UpdateStateByKey  transformation needs a function which accepts:

  • Seq[V]         It is a list of new values, received in the current batch for the given key.
  • Option[S]  – It is a state which we are updating on every iteration.

Basically, for our job’s first invocation the state is going to be none, that signifies the first batch for the given key. Afterwards, it’s up to us to manage its value.

As soon as we’re done with a particular state for a given key, we need to return none. Generally, that is a signal to spark, that we don’t need the state anymore.

An implementation of our scenario may look like this:

def updateUserEvents(newEvents1: Seq[UserEvent],
                   state: Option[UserSession]): Option[UserSession] = {
 /*
 Append the new events1 to the state. If this the first time we're invoked for the key
 we fallback to create a new UserSession with the new events1.
 */
 val newState1 = state
   .map(prev => UserSession(prev.userEvents ++ newEvents1))
   .orElse(Some(UserSession(newEvents1)))

/*
For suppose in the current batch we received the `isLast` event. That saves the session to the underlying store and return None to delete the state.
Else, returns the accumulated state. Thus we can keep updating it in the next batch.
*/
 if (newEvents1.exists(_.isLast)) {
   saveUserSession(state)
   None
 } else newState1
}

We want to take the state for the given user, at each batch. It combines both old events and new events into a new option[UserSession]. We can check if this is the end of user’s session for that, we check for the newly arrived sequence for the is the last flag on any of the user events.

As we receive the last message, we save the user action and return none. This indicates we’re done, for suppose there is no end message, we just return the newly created state for next iteration.

i. Limitations  of updateStateByKey

  • It is fact, that transformation iterates the entire state store, for incoming batches. Regardless of whether a new value for a given key has been consumed or not. Moreover, it affects performance, especially when dealing with a large amount of state over time.
  • There is no built-in timeout mechanism in it.
  • The return value from updateStateByKey is the same as the state we’re storing.

ii. Introducing mapWithState

In Spark 1.6.0, updateStateByKeys successor mapWithState introduces an experimental API. It includes some new features, which we were missing from updateStateByKey.

Features of mapWithState

  • Built-in timeout mechanism 

It attains a provision to hold our state if in case new data doesn’t come, we can hold the previous state. As timeout hits, it will be invoked one last time with a special flag.

  • Partial updates 

In the current batch, only keys which have new data arrived will be iterated, that is a great performance optimization.

  • Choose your return type 

Regardless of what type our state object holds, we can select a return type of our desire.

  • Initial state 

Select a custom RDD to initialize our stateful transformation on startup.

Signature for mapWithState:

mapWithState[StateType, MappedType](spec: StateSpec[K, V, StateType, MappedType])

As updateStateByKey demands to pass a function in the form of an Option[S]. Now we need to pass a StateSpec.

Differences between mapWithState and updateStateByKey in Stateful Transformations

  • Our key value, which wasn’t exposed previously.
  • Previously it was a Seq[S], now incoming new values in the form of Option[S].
  • Our state is now encapsulated in an object of type State[StateType]

Windowed Transformation

Basically, two parameters required in windowed operations, likewise, sliding duration and window duration. It is must, that both are multiple of the StreamingContext’s batch interval. Window duration monitors, how many previous batches of data are considered namely:

windowDuration/batchInterval.

Sliding duration monitors how frequently the new DStream computes results.

The two operations allow us to efficiently perform reductions on each window, for Example, reduceByWindow() and reduceByKeyAndWindow(). They take a single reduce function such as +, to run on the whole window.

Also, has a special form that allows Spark to compute the reduction incrementally. By only considering which data is coming into or going out. It requires an inverse of the reduce function, such as – for +. if your function has an inverse it is much more efficient for large windows.

val ipCountDStream1 = ipDStream1.reduceByKeyAndWindow(
 {(x, y) => x + y}, //  entering the window; Adding elements in the new batches  

{(x, y) => x - y}, //   exiting the window; Removing elements from the oldest batches
 Seconds(30),       // Window duration

 Seconds(10))       // Slide duration

Like Spark RDDs, if we do not apply any output stream on any of its descendants than it will not evaluate those DStreams. Also, if no output operations set in a StreamingContext, then context will not start.

Spark Streaming Stateful Transformations – Conclusion

Hence, in this blog, we have managed to convey the general use of Stateful Transformations in Apache Spark. Basically, the new mapWithState transformation brings a lot of power to the end-users.

Although, it is very enjoyable for those who want to work with stateful data with Spark. Now, we can also enjoy the guarantee Spark brings, such as fault tolerance & resiliency, distribution.

As a result, it improves the optimization process of the system.