Ways To Create RDD In Spark with Examples

RDD is used for efficient work by a developer, it is a read-only partitioned collection of records. In this article. We will learn about the several ways to Create RDD in spark. There are following ways to Create RDD in Spark.

Such as 1. Using parallelized collection 2. From existing Apache Spark RDD & 3. From external datasets. To get these concepts we will dive in, with few examples of the following methods to understand in depth.

Ways to Create RDD in Spark

Spark RDD are core abstraction of apache spark. RDD refers to Resilient Distributed Datasets. Generally, we consider it as a technological arm of apache-spark, they are immutable in nature. It supports self-recovery, i.e. fault tolerance or resilient property of RDDs.

They are the logically partitioned collection of objects which are usually stored in-memory. RDDs can be operated on in-parallel. We can perform different operations on RDD as well as on data storage to form another RDDs from it.

There are two more ways to create RDD in spark manually by cache and divide it manually. Users may also persist an RDD in memory. In parallel operation, we can reuse it efficiently. RDDs are a read-only partitioned collection of records.

As we cannot modify RDDs after once they created. This makes RDD to race different conditions and other failure scenarios.

There are two types of operations, we can perform on RDDs. They are transformations, which means to create a new dataset from the existing RDD.

Actions, return a value to the program after the completion of the computation on the dataset. Transformation returns new RDD, whereas action returns the new value to which are datatypes.

After learning about Apache Spark RDD, we will move forward towards the generation of RDD.

There are following ways to create RDD in Spark are:

1.Using parallelized collection.

2.From external datasets (Referencing a dataset in external storage system ).

3.From existing apache spark RDDs.

Furthermore, we will learn all these ways to create RDD in detail.

1. Using Parallelized collection

RDDs can be created generally by the parallelizing method. It is possible by taking an existing collection from our driver program. Driver program such as Scala, Python, Java. Also by calling the sparkcontext’s parallelize( ) method on it.

This is a basic method to create RDD which is applied at the very initial stage of spark. It creates RDD very quickly. It also initializes further operations on them at the same time. To operate this method, we need entire dataset on one machine.

Due to this property, this process is rarely used outside of testing and prototyping.

Considering the following example of sortbykey () method. In this programs, the values to be sorted is taken through the parallelized collection:

– For Example:

val    data=spark.sparkContext.parallelize(Seq(("sun",01),("mon",02),("tue",03), ("wed",04),("thus",05)))

val sorted = data.sortByKey()

sorted.foreach(println)

The number of partitions in which a dataset is cut into is a key point in the parallelized collection. we know spark cluster is logically partitioned. As we discussed earlier, we can also create RDD by its cache and divide it manually.

That means we can also set a number of partitions by our own. To set by own, we need to pass a number of partition as the second parameter in parallelize method.

– For Example:

sc.parallelize(data, 20)

So here we set the number of partitions 20 by our own.

We can see one more example below. In that, we are applying parallelize method and also giving the number of partitions by our own

– For Example:

val rdd1 = spark.sparkContext.parallelize(Array("sun","mon","tue","wed","thu","fri"),4)

val result = rdd1.coalesce(3)

result.foreach(println)

So, this is an initial level method to create our own RDDs. It helps to create RDDs very quickly also.

This method is generally used to create datasets by users.

2. From external datasets (Referencing a dataset in external storage system)

If any storage source supported by Hadoop, including our local file system it can create RDDs from it. Apache spark does support sequence files, textfiles, and any other Hadoop input format.

We can create textfile RDDs by sparkcontext’s textfile method. This method uses the URL for the file (either a local path on the machine or database or a hdfs://, s3n://, etc URL). It also reads whole as a collection of lines.

Always be careful that the path of the local system and worker node should always be similar. The file should be available at the same place in the local file system and worker node.

We can copy the file of the worker nodes. We can also use a network mounted the shared file system.

To load a dataset from an external storage system, we can use data frame reader interface. External storage system such as file systems, key-value stores. It supports many file formats like:

a. CSV (String Path) Example

In this example, we are providing a CSV file which returns dataset<Row> as a result.

import org.apache.spark.sql.SparkSession

def main(args: Array[String]):Unit = {

object DataFormat {

Val    spark =  SparkSession.builder.appName("ExtDataEx1").master("local").getOrCreate()

val dataRDD = spark.read.csv("path/of/csv/file").rdd

Note – We have seen that in this example .rdd method is used. We use this format to convert Dataset <Row> to RDD <Row>.

b.  json (String Path) Example

In this example, we are providing a JSON file (one object per line ) which returns Dataset<Row> as a result.

val dataRDD = spark.read.json("path/of/json/file").rdd
c. textfile (String Path) Example

In this example, we are providing a text file which returns Dataset of a string as a result.

val dataRDD = spark.read.textFile("path/of/text/file").rdd

3. From existing Apache Spark RDDs

As we discussed earlier, that RDD is immutable so, we can not change anything to it. So we can create different RDD from the existing RDDs. This process of creating another dataset from the existing ones means transformation.

As a result, transformation always produces new RDD. As they are immutable, no changes take place in it if once created. This property maintains the consistency over the cluster.

Some of the operations performed on RDD are map,  filter, count, distinct, flatmap etc.

– For Example:

In this example, we are providing a text file which returns Dataset of the string as a result.

val words=spark.sparkContext.parallelize(Seq("sun", "rises", "in", "the", "east", "and", "sets", "in", “the”, "west"))

val wordPair = words.map(w => (w.charAt(0), w))

wordPair.foreach(println)

Note – In the above example RDD “wordPair” is created from existing RDD “word” using map ( ) transformation. This result contains word and starting character together of the same word.

Conclusion

Hence, we have learned all possible ways to generate Spark RDD in-depth: parallelized collection, from external datasets and from existing Apache Spark RDD. As well as manually we can also create RDD by its cache and divide.

So these datasets are no longer difficult for us to operate. This will enhance our efficiency while working on resilient distributed datasets.