Practical Apache Spark in 10 minutes. Part 2 - RDD
Spark’s primary abstraction is a distributed collection of items called a Resilient Distributed Dataset (RDD). It is a fault-tolerant collection of elements which allows parallel operations upon itself. RDDs can be created from Hadoop InputFormats (such as HDFS files) or by transforming other RDDs.
Spark provides two ways to create RDDs: loading an external dataset and parallelizing a collection in your driver program.
The simplest way to create RDDs is to take an existing collection in your program and pass it to SparkContext’s
For example, in different programming languages it will look like this:
val input = sc.parallelize(List(1, 2, 3, 4))
numbers = sc.parallelize([1, 2, 3, 4])
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4))
The other way is to read from a file:
val lines = sc.textFile("README.md")
lines = sc.textFile("README.md")
JavaRDD<String> lines = sc.textFile("README.md")
RDDs support two types of operations: transformations and actions. Transformations are operations on RDDs, which create a new RDD from existing one. Actions return a result to the driver program. All transformations in Spark are lazy. This means, they do not compute their result right away, they just remember all the transformations applied to the base dataset (or a file). Transformations are only computed when an action requires a result to be returned to driver program, or written to the storage.
Let’s create an RDD vector and do some transformations with it. We will be using Pyspark for this example.
Small tip: if you want to suppress the Spark logging output, do the following:
num = sc.parallelize([4, 6, 6, 1, 3, 0, 2, 2, 2])
map(function) transformation returns a new RDD, applying a function to each element of the original one.
result = num.map(lambda x: x**2)
For now, Spark has only remembered the transformations. To get the actual result we need to use an action. Like
take(), which take the specified number of element from the RDD.
[16, 36, 36, 1, 9, 0, 4, 4, 4]
filter(function) transformation returns a new RDD, retaining only those, for which function is evaluated to true.
result = num.filter(lambda x: x >= 3) result.take(10)
[4, 6, 6, 3]
distinct() transformation returns a new RDD, removing all the duplicates from the original dataset.
result = num.distinct() result.take(10)
[0, 1, 2, 3, 4, 6]
In case we have two RDDs, we can do some transformations to them too. Let's create a new RDD:
num2 = sc.parallelize([5, 5, 8, 2, 2, 1, 7, 3, 3])
union(other) transformation returns a new dataset, which contains all elements from both RDDs.
result = num.union(num2) result.take(20)
[4, 6, 6, 1, 3, 0, 2, 2, 2, 5, 5, 8, 2, 2, 1, 7, 3, 3]
intersection(other) returns a dataset, which contains only elements found in both RDDs.
result = num.intersection(num2) result.take(20)
[2, 1, 3]
subtract(other) transformation removes all contents of the other RDD.
result = num.subtract(num2) result.take(20)
[0, 4, 6, 6]
We can also compute a Cartesian product of two datasets. The
cartesian(other) transformation returns a dataset of all pairs (a, b), where a belongs to original dataset, and b to other.
result = num.cartesian(num2) result.take(20)
[(4, 5), (4, 5), (4, 8), (4, 2), (4, 2), (4, 1), (4, 7), (4, 3), (4, 3), (6, 5), (6,5), (6, 8), (6, 2), (6, 2), (6, 1), (6, 7), (6, 3), (6, 3), (6, 5), (6, 5)]
As we’ve mentioned earlier, actions return some value. For example, we can count elements in the dataset using the simple command:
Count occurrences of elements in RDD. This action returns a dictionary of (
[4, 6, 6, 1, 3, 0, 2, 2, 2]
top returns a number of top elements from the RDD
[6, 6, 4]
takeOrdered returns a number of elements in ascending order
[0, 1, 2, 2, 2]
The most common action upon RDD is
reduce(function), which takes a function operating on two elements from RDD returning one element of the same type.
num.reduce(lambda x, y: x + y)
Now, let's take a look at the
fold() action, which is similar to
reduce() and acts pretty much the same, but allows to take the zero value for the initial call.
num.fold(0, lambda x,y : x + y)
aggregate() function frees us from the constraint of having the return be the same type as the RDD we are working on. Let's take a closer look at this function and walk through the simple example step by step:
num = sc.parallelize([4, 6, 6, 1, 3, 1, 2, 2, 2]) sumCount = num.aggregate((1, 0), (lambda tup, value: (value * tup, tup + 1), (lambda tup, value_tup: (value_tup * tup, value_tup + tup))) sumCount
(1,0) is a starting value, here it is a tuple which we are going to use. First
lambda() function takes tuple and one value as an input, the second function in its turn, takes two tuples as an input.
RDD of key-value pairs
Spark provides special operations on RDDs containing key-value pairs. These RDDs are called pair RDDs. Pair RDDs are a useful building block in many programs, as they expose operations that allow you to act on each key in parallel or regroup data across the network.
RDDs of key-value pairs also support some operations like RDDs. That’s the topic of our next blog post.
In this second article in the line of tutorials about working with Apache Spark, we’ve guided you through the Apache Spark's RDD which is its primary abstraction. Use RDD programming guide to learn more about commands and operations you can use.
In the next article, we will talk about Data Frames in Apache Spark.
Originally published on datascience-school.com