Overview
Teaching: 40 min
Exercises: 0 minQuestions
How do you program using Spark?
Objectives
Learn how to use PySpark to write Spark-based programs.
A Spark program typically follows a simple paradigm:
Remember that an RDD is a Resilient Distributed Data set, which is essentially a distributed collection of items.
Essentially the driver program creates one or more RDDs, applies operations to transform the RDD, then invokes some action on the transformed RDD.
These steps are outlined as follows:
When Spark runs a closure on a worker, any variables used in the closure are copied to that node, but are maintained within the local scope of that closure.
Spark provides two types of shared variables that can be interacted with by all workers in a restricted fashion:
Spark applications are run as independent sets of processes, coordinated by a SparkContext
in the driver program. The context will
connect to some cluster manager (e.g. YARN) which allocates system resources.
Each worker in the cluster is managed by an executor, which is in turn managed by the SparkContext. The executor manages computation as well as storage and caching on each machine.
What is important to note is that:
This is different from Hadoop code, where you might submit a job from anywhere to the JobTracker
, which then handles the execution on the cluster.
To start using Spark, we have to create an RDD. The SparkContext
provides a number of methods to do this. We will use the textFile
method,
which reads a file an creates an RDD of strings, one for each line in the file.
Create a file called wordcount_spark.py
with the following code:
from pyspark import SparkContext
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
print(text.take(10))
We run this using the PySpark spark-submit
command as follows:
spark-submit wordcount_spark.py
Running this program display the first 10 entries in the RDD:
['The Project Gutenberg EBook of Moby Dick; or The Whale, by Herman Melville', '', 'This eBook is for the use of anyone anywhere at no cost and with',
'almost no restrictions whatsoever. You may copy it, give it away or', 're-use it under the terms of the Project Gutenberg License included',
'with this eBook or online at www.gutenberg.org', '', '', 'Title: Moby Dick; or The Whale', '']
We use the same splitter function we used previously to split lines correctly. The flatMap
method applies the function to all elements of the
RDD and flattens the results into a single list of words, as follows:
words = text.flatMap(splitter)
Making these changes, wordcount_spark.py
now looks like this:
from pyspark import SparkContext
import re
# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
line = re.sub(r'^\W+|\W+$', '', line)
return map(str.lower, re.split(r'\W+', line))
if __name__ == '__main__':
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
words = text.flatMap(splitter)
print(words.take(10))
After running this, words
will conting the individual words:
['the', 'project', 'gutenberg', 'ebook', 'of', 'moby', 'dick', 'or', 'the', 'whale']
Now we need to perform the mapping step. This is simply the case of applying the function lambda x: (x,1)
to each element:
words_mapped = words.map(lambda x: (x,1))
Our wordcount_spark.py
program now looks like this:
from pyspark import SparkContext
import re
# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
line = re.sub(r'^\W+|\W+$', '', line)
return map(str.lower, re.split(r'\W+', line))
if __name__ == '__main__':
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
words = text.flatMap(splitter)
words_mapped = words.map(lambda x: (x,1))
print(words_mapped.take(10))
Running the program results in the mapped RDD:
[('the', 1), ('project', 1), ('gutenberg', 1), ('ebook', 1), ('of', 1), ('moby', 1), ('dick', 1), ('or', 1), ('the', 1), ('whale', 1)]
Next, the shuffling step is performed using the sortByKey
method, which does not require any arguments:
sorted_map = words_mapped.sortByKey()
Adding this to our wordcount_spark.py
program results in:
from pyspark import SparkContext
import re
# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
line = re.sub(r'^\W+|\W+$', '', line)
return map(str.lower, re.split(r'\W+', line))
if __name__ == '__main__':
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
words = text.flatMap(splitter)
words_mapped = words.map(lambda x: (x,1))
sorted_map = words_mapped.sortByKey()
print(sorted_map.take(10))
The output that is generated is shown below. Empty strings are generated by the splitter function for blank lines. Since these sort lexically first, we now see them in the output.
[('', 1), ('', 1), ('', 1), ('', 1), ('', 1), ('', 1), ('', 1), ('', 1), ('', 1), ('', 1)]
For the reduce step, we use the reduceByKey
method to apply a supplied function to merge values for each key. In this case, the add
function will perform a sum. We need to import the add
operator in order to be able to use it as follows:
counts = sorted_map.reduceByKey(add)
Now wordcount_spark.py
looks like:
from pyspark import SparkContext
from operator import add # Required for reduceByKey
import re
# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
line = re.sub(r'^\W+|\W+$', '', line)
return map(str.lower, re.split(r'\W+', line))
if __name__ == '__main__':
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
words = text.flatMap(splitter)
words_mapped = words.map(lambda x: (x,1))
sorted_map = words_mapped.sortByKey()
counts = sorted_map.reduceByKey(add)
print(counts.take(10))
Here is the output after this step:
[('', 3235), ('dunfermline', 1), ('heedful', 6), ('circle', 24), ('divers', 4), ('riotously', 1), ('patrolled', 1), ('mad', 37), ('lapsed', 1),
('tents', 3)]
This is very close to the final result, since we have a count of each of the workds. We can use the max
method to find the word with the maximum
number of occurrences. Here is the final version of wordcount_spark.py
:
from pyspark import SparkContext
from operator import add
import re
# remove any non-words and split lines into separate words
# finally, convert all words to lowercase
def splitter(line):
line = re.sub(r'^\W+|\W+$', '', line)
return map(str.lower, re.split(r'\W+', line))
if __name__ == '__main__':
sc = SparkContext("local", "wordcount")
text = sc.textFile('pg2701.txt')
words = text.flatMap(splitter)
words_mapped = words.map(lambda x: (x,1))
sorted_map = words_mapped.sortByKey()
counts = sorted_map.reduceByKey(add)
print(counts.max(lambda x: x[1]))
Here is the final output:
('the', 14620)
Spark also provides the parallelize method which distributes a local Python collection to form an RDD (obviously a cluster is required to obtain true parallelism.)
The following example shows how we can calculate the number of primes in a certain range of numbers. First, we define a function to check if a number is prime. This requires checking if it is divisible by all odd numbers up to the square root.
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
Now we can create an RDD comprising all numbers from 0 to n (in this case n = 1000000) using the code:
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(range(1000000))
Finally, we use the filter
method to apply the function to each value, returning an RDD containing only values that evalute to True
.
We can then count these to determine the number of primes.
# Compute the number of primes in the RDD
print(nums.filter(isprime).count())
Here is the final version of the primes.py
program:
from pyspark import SparkContext
def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
return False
# 2 is the only even prime number
if n == 2:
return True
# all other even numbers are not primes
if not n & 1:
return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
if n % x == 0:
return False
return True
if __name__ == '__main__':
sc = SparkContext("local", "primes")
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(range(1000000))
# Compute the number of primes in the RDD
print(nums.filter(isprime).count())
Running this program generates the correct answer:
78498
Key Points
Spark defines an API for distributed computing using distributed data sets.
A driver program coordinates the overall computation.
An executor is a process that runs computations and stores data.
Application code is sent to the executors.
Tasks are sent to the executors to run.