Lab 14: Spark
Due at 11:59pm on 08/06/2015.
Starter Files
Download lab14.zip. Inside the archive, you will find starter files for the questions in this lab, along with a copy of the OK autograder.
Submission
By the end of this lab, you should have submitted the lab with
python3 ok --submit
. You may submit more than once before the
deadline; only the final submission will be graded.
- You do not need to submit anything for this lab.
Setting up Spark and EC2
Launching EC2 Instances
To get started, we will need to launch EC2 instances. First download the
authentication keys from
Piazza. Move the zip archive
into your lab14
folder and then extract the files. You should now have
credentials.csv
and cs61a
. You may need to move these two files into
the lab14
folder.
Next, edit the email.txt
file and add your email address to it.
If a certain region hits the limit on the number of clusters, try another
region. But before you do, make sure you terminate your cluster with python3
run_spark.py --destroy
.
Now we will start four EC2 instances. This process will take a long time (at least 5 minutes). Do not terminate the launching process while it is in progress. The launching script will install Spark onto each of the instances. Use:
python3 run_spark.py --launch
Viewing the Web GUI
Spark automatically creates a website that you can visit to check the status of your cluster, and the progress for a particular job. Use:
python3 run_spark.py --website
No jobs are currently running, so the jobs website will not load. However, once we start launching programs in this lab, the website will show progress information.
Downloading the Dataset
Once the instances have launched, we will need to download the dataset onto them. Use:
python3 run_spark.py --download_dataset
Terminating EC2 Instances
After you have finished the lab, you must terminate the EC2 instances. We are billed at an hourly rate. Use:
python3 run_spark.py --destroy
MapReduce
In this lab, we'll be working with MapReduce, a programming paradigm developed by Google, which allows a programmer to process large amounts of data in parallel on many computers.
A computation in MapReduce consists two components: the mapper and the reducer.
- The mapper takes an input file, and outputs a series of key-value pairs:
The reducer takes the (sorted) output from the mapper, and outputs a single value for each key. The mapper's output will be sorted according to the key.
age 29 name cecilia job gradstudent salary 42
In the example above, the key-value pairs are:
- age: 29
- name: cecilia
- job: gradstudent
- salary: 42
The following diagram summarizes the entire MapReduce pipeline:
Spark and EC2
Spark is a framework that builds on MapReduce. The AMPLab first developed this system to improve an open source implementation of MapReduce, Hadoop. In this lab, we will run Spark on Amazon EC2 instances, which will demonstrate how you can write programs that can harness parallel processing. EC2, or Elastic Compute Cloud, allows customers to rent servers for their own use. We will rent a couple servers on a hourly basis. (We are paying for this, so please do not waste resources.)
Yelp Academic Dataset Analysis
In this section, we introduce the framework for mappers and reducers in Spark. We will be analyzing the Yelp academic dataset that was used in the maps project.
Example: Counting Reviews
Our first exercise will be counting the number of reviews for each distinct
restaurant in our dataset, which is a sequence of review
s. A review
contains
the following attributes:
review.text
: The text of the review.review.stars
: The number of stars the user gave the restaurant. It is an integer from 1 to 5 (inclusive).review.business_id
: A unique restaurant identifier, such as1CBs84C-a-cuA3vncXVSAw
(La Val's Pizza).
To formulate this as a MapReduce problem, we need to define an appropriate
mapper
and reducer
function.
Recall what the mapper does: for each review in our dataset, the mapper outputs a key-value pair. The mapper function takes in one review at a time. What should our key-value pairs be for our example?
- key: To group our data by restaurants we will use the
business_id
of each review as our key - value: To count each line exactly once we will use 1 as our value
For example, for the following dataset
Review('I hate the food here. Avoid at all costs.', 1, 'id1')
Review('So good. #icanteven', 5, 'id1')
Review('This place is good. Sometimes.', 3, 'id2')
Review('What kind of restaurant only serves one type of pizza?!?', 2, 'id3')
Review('They make some awesome pizza here. One type a day.', 4, 'id3')
(notice there are 5 reviews); it then outputs a sequence of key-value pairs like this:
('id1', 1)
('id1', 1)
('id2', 1)
('id3', 1)
('id3', 1)
During the reducing step, we combine all values that share the same key together using the reducer. The reducer is a combining function that takes in two values that are associated with the same key. It should return a single value. Spark will repeatedly call the reducer is the values until they have been reduced to a single value. The output of the step will be a sequence of key-value pairs, where the value is the result of the reducing step.
('id1', 2)
('id2', 1)
('id3', 2)
Let's examine the code for our mapper and reducer. The code is provided in
yelp/count_reviews.py
. You can run the example by using
python3 run_spark.py --count_reviews
This will copy the scripts to the EC2 instances. It will then shard the data and
process them in parallel. The output will be saved to a file
spark-count-reviews.out/part-00000
.
Question 1: Counting Stars
Now let's count how many reviews have a certain rating. For example, how many
4-starred ratings are there? Write a mapper and a reducer to calculate this.
Think about what the keys and the values represent. You will need to edit the
count_by_stars.py
file.
def mapper(review):
"*** YOUR CODE HERE ***"
return None # REPLACE THIS LINE
return (review.stars, 1)
def reducer(val1, val2):
"*** YOUR CODE HERE ***"
return None # REPLACE THIS LINE
return val1 + val2
To test your code, run
python3 run_spark.py --count_by_stars
The output will be in spark-count-by-stars.out/part-00000
. It should look
something like this
(2, 1950)
(4, 6395)
(1, 1054)
(3, 3989)
(5, 4008)
Question 2: Counting Stars in SQL
Now let's use a simple SQL query to do the same computation: counting the number
of reviews that have a certain rating. You will need to edit the
count_by_stars_sql.py
file.
def sql_query():
"*** YOUR CODE HERE ***"
return 'YOUR SQL QUERY HERE'
return 'SELECT stars, count(*) FROM reviews GROUP BY stars'
To test your code, run
python3 run_spark.py --count_by_stars_sql
The output will be in spark-count-by-stars-sql.out/part-00000
. It should look
something like this
Row(stars=1, c1=1054)
Row(stars=2, c1=1950)
Row(stars=3, c1=3989)
Row(stars=4, c1=6395)
Row(stars=5, c1=4008)
Question 3: Analyzing Positivity
We will now try to analyze how "positive" certain words are. Positive words, such as "good", "excellent", "delicious", should appear in positive ratings more frequently than in negative ratings. We will rank positivity using a number scale from 1 to 5, where 1 represents negative words, and 5 represents positive words.
To do so, we will analyze each unique word in a review. The mapper should output a list of (key, value) pairs, where the key is a word, and the value is the number of stars associated with the review. For example:
Review('this is good', 5, 'id1') => [('this', 5), ('is', 5), ('good', 5)]
Review('this is bad', 2, 'id1') => [('this', 2), ('is', 2), ('bad', 2)]
The reducer will take in a sequence of values associated with the same key. For example, one reducer call might take in all of the ratings associated with the word "this". The reducer will then combine all of the values by taking the average. Continuing the same example:
('this', [5, 2]) => 3.5
('is', [5, 2]) => 3.5
('good', [5]) => 5
('bad', [2]) => 2
def mapper(review):
words = list(set(REGEX.sub(' ', review.text.lower()).split()))
"*** YOUR CODE HERE ***"
return [(KEY, VALUE) for word in words]
return [(word, review.stars) for word in words]
def reducer(values):
"*** YOUR CODE HERE ***"
return 0 # REPLACE THIS LINE
return avg(values)
To test your code, use
python3 run_spark.py --analyze_positivity
The output will be in spark-analyze-positivity-out/part-00000
. It should look
something like this
(u'awesome', 4.1554524361948957)
(u'so', 3.5585770983389682)
(u'into', 3.5268373245251858)
(u'salads', 3.666015625)
Question 4: Extra for Experts
We can now use the calculated word positivities to try to predict the star ratings for each review in the much larger Yelp academic dataset. For each review, we'll analyze the unique words in the text. We'll use the average positivity value of the words to rate each review.
You will need to modify the predict_ratings.py
file. The mapper and reducer
will be the same as in analyze_positivity.py
. You will only need to define the
predictor
function. The predictor takes in a review and returns three values:
the review's text, its actual rating, and your predicted rating.
def mapper(review):
words = set(REGEX.sub(' ', review.text.lower()).split())
"*** YOUR CODE HERE ***"
return # COPY FROM PREVIOUS QUESTION
return [(word, review.stars) for word in words]
def reducer(values):
"*** YOUR CODE HERE ***"
return # COPY FROM PREVIOUS QUESTION
return avg(values)
def make_predictor(positivities):
def predictor(review):
words = set(REGEX.sub(' ', review.text.lower()).split())
"*** YOUR CODE HERE ***"
return (KEY, VALUE) # REPLACE THIS LINE
return (review.text, review.stars, avg(positivities[w] for w in words if w in positivities)) return predictor
To test your code, use
python3 run_spark.py --predict_ratings
The output will be in spark-predict-ratings.out/part-00000
. The first column
contains the reviews' texts. The second column represents the actual ratings.
The last column represents your predicted ratings. The closer the numbers the
better. It should look something like:
(u"I'm a fan of soft serve ...", 5.0, 4.1235340109460514)
(u"Pretty great! Okay, so this place ...)", 4.0, 3.6952811547386157)
(u'The Tale of the 4-Starred, Up Close and Personal Bar...', 4.0, 3.0033519553072625)
Terminating EC2 Instances
We hope you enjoyed the lab. Please terminate your EC2 instances to save costs. Use
python3 run_spark.py --destroy