Apache Spark Transformation Operations

Seems like Spark RDDs, input DStream transformations in Apache spark also allow the data to be modified. Many of the spark transformations available on normal spark RDD’s, that Dstreams support.

In this blog, we will learn several spark transformation operations. Basically, we will cover some of the streaming operations, for example, spark map, flatmap, filter, count, ReduceByKey, CountByValue, and UpdateStateByKey.

Meanwhile, we will use spark transformation examples, those will help you in further Spark jobs.

types of spark transformation operations

Types of Spark Transformation Operations

Here, we have discussed the most common streaming transformation operations being used, for example map(), flatmap(), filter(), CountByValue and UpdateStateByKey, with their examples.

Spark Transformation Operations

Transformation Operations in Apache Spark streaming

a. Map()

Basically, it returns a new DStream, while passing each element of the source DStream through function DStream.

For Example

val config = new SparkConf().setMaster("local[2]") .setAppName("MapOpTest")

val ssc = new StreamingContext(config , Seconds(1))

val word = ssc.socketTextStream("localhost", 9999)

val answer = word.map { words=> ("hello" ,words ) }    // map hello with each line

answer.print()

ssc.start()    // Start the computation

ssc.awaitTermination()    // Wait for termination

}

b. FlatMap()

FlatMap() operation is similar to map() operation, while the only difference is, each input item can be mapped to 0 or more output items.

For Example

val data = ssc.socketTextStream("localhost", 9999)

val word = data.flatMap(_.split(" "))    // for each line it split the words by space

val pairs = word.map(x => (x, 1))

val wordCount = pairs.reduceByKey(_ + _)

wordCount.print()

c. Filter()

Basically, It returns a new DStream, while selecting only the records of the source DStream on which func returns true.

For Example

val line = ssc.socketTextStream("localhost", 9999)

val word = line.flatMap(_.split(" "))

val output = word.filter { x => x.startsWith("s") }    // filter the words starts with letter“s”

output.print()

d. ReduceByKey(func, [numTasks])

In spark, when called on a DStream of (K, V) pairs, reduceByKey(func) return a new DStream of (K, V) pairs. However, it happens when key values are aggregated by using the given reduce function.

For Example

val line = ssc.socketTextStream("localhost", 9999)

val words = line.flatMap(_.split(" "))

val pair = words.map(word => (word, 1))

val wordCount = pair.reduceByKey(_ + _)

wordCount.print()

e. CountByValue()

In spark, when called on a DStream of elements of type K, countByValue() returns a new DStream of (K, Long) pairs. Only where the value of each key is its frequency in each spark RDD of the source DStream.

For Example

val lines = ssc.socketTextStream("localhost", 9999)

val word = lines.flatMap(_.split(" "))

word.countByValue().print()

f. UpdateStateByKey()

It allows maintaining arbitrary state while continuously updating it with new information. Moreover, there are two steps to use this operation:

  • Define  state

It can be an arbitrary data type.

  • Define state update function

Specified with a function that how to update state, while using previous state and new values from an input stream.

In addition, Spark will apply the state update function for all existing keys, in every batch. Regardless of whether they have new data in a batch or not. If in any case, the update function returns none, then the key-value pair will be eliminated.

For Example

def updateFunc(values: Seq[Int], state: Option[Int]): Option[Int] = {

val currentCount = values.sum

val previousCount = state.getOrElse(0)

Some(currentCount + previousCount)

}

val ssc = new StreamingContext(conf , Seconds(10))

val lines = ssc.socketTextStream("localhost", 9999)

ssc.checkpoint("/home/asus/checkpoints/")    // Here ./checkpoints/ are the directory where all checkpoints are stored.

val words = lines.flatMap(_.split(" "))

val pair = words.map(word => (word, 1))

val globalCountStream = pair.updateStateByKey(updateFunc)

globalCountStream.print()

ssc.start()   // Start the computation

ssc.awaitTermination()

Conclusion

Hence, in spark transformation operations we have discussed some common transformation operations in spark. As a result, we hope these examples will help you in further Spark jobs.