We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies.

We use cookies and other tracking technologies to improve your browsing experience on our site, analyze site traffic, and understand where our audience is coming from. To find out more, please read our privacy policy.

By choosing 'I Accept', you consent to our use of cookies and other tracking technologies. Less

We use cookies and other tracking technologies... More

Login or register
to apply for this job!

Login or register to start contributing with an article!

Login or register
to see more jobs from this company!

Login or register
to boost this post!

Show some love to the author of this blog by giving their post some rocket fuel 🚀.

Login or register to search for your ideal job!

Login or register to start working on this issue!

Engineers who find a new job through Ai Works average a 15% increase in salary 🚀

Blog hero image

Practical Apache Spark in 10 minutes. Part 2 - RDD

Igor Bobriakov 31 October, 2018 (4 min read)

Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). It is a fault-tolerant collection of elements which allows parallel operations upon itself. RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.

Creating RDD

Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.

The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s parallelize() method.

For example, in different programming languages it will look like this:

Scala

val input = sc.parallelize(List(1, 2, 3, 4))

Python

numbers = sc.parallelize([1, 2, 3, 4])

Java

JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4))

The other way is to read from a file:

Scala

val lines = sc.textFile("README.md")

Python

lines = sc.textFile("README.md")

Java

JavaRDD<String> lines = sc.textFile("README.md")

RDD Operations

RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs, which create a new RDD from existing one. Actions return a result to the driver program. All transformations in Spark are lazy. This means, they do not compute their result right away, they just remember all the transformations applied to the base dataset (or a file). Transformations are only computed when an action requires a result to be returned to driver program, or written to the storage.

Transformations

Let’s create an RDD vector and do some transformations with it. We will be using Pyspark for this example.

Small tip: if you want to suppress the Spark logging output, do the following:

Pyspark

sc.setLogLevel("ERROR")

Pyspark

num = sc.parallelize([4, 6, 6, 1, 3, 0, 2, 2, 2])

The map(function) transformation returns a new RDD, applying a function to each element of the original one.

Pyspark

result = num.map(lambda x: x**2) 

For now, Spark has only remembered the transformations. To get the actual result we need to use an action. Like take(), which take the specified number of element from the RDD.

Pyspark

result.take(10)
[16, 36, 36, 1, 9, 0, 4, 4, 4]

The filter(function) transformation returns a new RDD, retaining only those, for which function is evaluated to true.

Pyspark

result = num.filter(lambda x: x >= 3)
result.take(10)
[4, 6, 6, 3]

The distinct() transformation returns a new RDD, removing all the duplicates from the original dataset.

Pyspark

result = num.distinct()
result.take(10)
[0, 1, 2, 3, 4, 6]

In case we have two RDDs, we can do some transformations to them too. Let's create a new RDD:

Python

num2 = sc.parallelize([5, 5, 8, 2, 2, 1, 7, 3, 3]) 

The union(other) transformation returns a new dataset, which contains all elements from both RDDs.

Pyspark

result = num.union(num2)
result.take(20)
[4, 6, 6, 1, 3, 0, 2, 2, 2, 5, 5, 8, 2, 2, 1, 7, 3, 3]

An intersection(other) returns a dataset, which contains only elements found in both RDDs.

Pyspark

result = num.intersection(num2)
result.take(20)
[2, 1, 3]

The subtract(other) transformation removes all contents of the other RDD.

Pyspark

result = num.subtract(num2)
result.take(20)
[0, 4, 6, 6]

We can also compute a Cartesian product of two datasets. The cartesian(other) transformation returns a dataset of all pairs (a, b), where a belongs to original dataset, and b to other.

Pyspark

result = num.cartesian(num2)
result.take(20)
[(4, 5), (4, 5), (4, 8), (4, 2), (4, 2), (4, 1), (4, 7), (4, 3), (4, 3), (6, 5), (6,5), (6, 8), (6, 2), (6, 2), (6, 1), (6, 7), (6, 3), (6, 3), (6, 5), (6, 5)]

Actions

As we’ve mentioned earlier, actions return some value. For example, we can count elements in the dataset using the simple command:

Pyspark

num.count()
9

Count occurrences of elements in RDD. This action returns a dictionary of (value, count) elements.

Pyspark

num.collect()
[4, 6, 6, 1, 3, 0, 2, 2, 2]

top returns a number of top elements from the RDD

Pyspark

num.top(3)
[6, 6, 4] 

takeOrdered returns a number of elements in ascending order

Pyspark

num.takeOrdered(5)
[0, 1, 2, 2, 2]

The most common action upon RDD is reduce(function), which takes a function operating on two elements from RDD returning one element of the same type.

Pyspark

num.reduce(lambda x, y: x + y)
[26]

Now, let's take a look at the fold() action, which is similar to reduce() and acts pretty much the same, but allows to take the zero value for the initial call.

Pyspark

num.fold(0, lambda x,y : x + y)
[26]

An aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on. Let's take a closer look at this function and walk through the simple example step by step:

Pyspark

num = sc.parallelize([4, 6, 6, 1, 3, 1, 2, 2, 2])
sumCount = num.aggregate((1, 0), 
     (lambda tup, value: (value * tup[0], tup[1] + 1),
     (lambda tup, value_tup: (value_tup[0] * tup[0], value_tup[1] + tup[1])))
sumCount
(3456, 9)

(1,0) is a starting value, here it is a tuple which we are going to use. First lambda() function takes tuple and one value as an input, the second function in its turn, takes two tuples as an input.

RDD of key-value pairs

Spark provides special operations on RDDs containing key-value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.

RDDs of key-value pairs also support some operations like RDDs. That’s the topic of our next blog post.

Conclusion

In this second article in the line of tutorials about working with Apache Spark, we’ve guided you through the Apache Spark's RDD which is its primary abstraction. Use RDD programming guide to learn more about commands and operations you can use.

In the next article, we will talk about Data Frames in Apache Spark.

Originally published on datascience-school.com