Due at 11:59pm on 08/06/2015.

Starter Files

Download lab14.zip. Inside the archive, you will find starter files for the questions in this lab, along with a copy of the OK autograder.

Submission

By the end of this lab, you should have submitted the lab with python3 ok --submit. You may submit more than once before the deadline; only the final submission will be graded.

  • You do not need to submit anything for this lab.

Setting up Spark and EC2

Launching EC2 Instances

To get started, we will need to launch EC2 instances. First download the authentication keys from Piazza. Move the zip archive into your lab14 folder and then extract the files. You should now have credentials.csv and cs61a. You may need to move these two files into the lab14 folder.

Next, edit the email.txt file and add your email address to it.

If a certain region hits the limit on the number of clusters, try another region. But before you do, make sure you terminate your cluster with python3 run_spark.py --destroy.

Now we will start four EC2 instances. This process will take a long time (at least 5 minutes). Do not terminate the launching process while it is in progress. The launching script will install Spark onto each of the instances. Use:

python3 run_spark.py --launch

Viewing the Web GUI

Spark automatically creates a website that you can visit to check the status of your cluster, and the progress for a particular job. Use:

python3 run_spark.py --website

No jobs are currently running, so the jobs website will not load. However, once we start launching programs in this lab, the website will show progress information.

Downloading the Dataset

Once the instances have launched, we will need to download the dataset onto them. Use:

python3 run_spark.py --download_dataset

Terminating EC2 Instances

After you have finished the lab, you must terminate the EC2 instances. We are billed at an hourly rate. Use:

python3 run_spark.py --destroy

MapReduce

In this lab, we'll be working with MapReduce, a programming paradigm developed by Google, which allows a programmer to process large amounts of data in parallel on many computers.

A computation in MapReduce consists two components: the mapper and the reducer.

  • The mapper takes an input file, and outputs a series of key-value pairs:
  • The reducer takes the (sorted) output from the mapper, and outputs a single value for each key. The mapper's output will be sorted according to the key.

    age 29
    name cecilia
    job gradstudent
    salary 42

    In the example above, the key-value pairs are:

    • age: 29
    • name: cecilia
    • job: gradstudent
    • salary: 42

The following diagram summarizes the entire MapReduce pipeline:

Mapreduce Diagram

Spark and EC2

Spark is a framework that builds on MapReduce. The AMPLab first developed this system to improve an open source implementation of MapReduce, Hadoop. In this lab, we will run Spark on Amazon EC2 instances, which will demonstrate how you can write programs that can harness parallel processing. EC2, or Elastic Compute Cloud, allows customers to rent servers for their own use. We will rent a couple servers on a hourly basis. (We are paying for this, so please do not waste resources.)

Yelp Academic Dataset Analysis

In this section, we introduce the framework for mappers and reducers in Spark. We will be analyzing the Yelp academic dataset that was used in the maps project.

Example: Counting Reviews

Our first exercise will be counting the number of reviews for each distinct restaurant in our dataset, which is a sequence of reviews. A review contains the following attributes:

  • review.text: The text of the review.
  • review.stars: The number of stars the user gave the restaurant. It is an integer from 1 to 5 (inclusive).
  • review.business_id: A unique restaurant identifier, such as 1CBs84C-a-cuA3vncXVSAw (La Val's Pizza).

To formulate this as a MapReduce problem, we need to define an appropriate mapper and reducer function.

Recall what the mapper does: for each review in our dataset, the mapper outputs a key-value pair. The mapper function takes in one review at a time. What should our key-value pairs be for our example?

  • key: To group our data by restaurants we will use the business_id of each review as our key
  • value: To count each line exactly once we will use 1 as our value

For example, for the following dataset

Review('I hate the food here. Avoid at all costs.', 1, 'id1')
Review('So good. #icanteven', 5, 'id1')
Review('This place is good. Sometimes.', 3, 'id2')
Review('What kind of restaurant only serves one type of pizza?!?', 2, 'id3')
Review('They make some awesome pizza here. One type a day.', 4, 'id3')

(notice there are 5 reviews); it then outputs a sequence of key-value pairs like this:

('id1', 1)
('id1', 1)
('id2', 1)
('id3', 1)
('id3', 1)

During the reducing step, we combine all values that share the same key together using the reducer. The reducer is a combining function that takes in two values that are associated with the same key. It should return a single value. Spark will repeatedly call the reducer is the values until they have been reduced to a single value. The output of the step will be a sequence of key-value pairs, where the value is the result of the reducing step.

('id1', 2)
('id2', 1)
('id3', 2)

Let's examine the code for our mapper and reducer. The code is provided in yelp/count_reviews.py. You can run the example by using

python3 run_spark.py --count_reviews

This will copy the scripts to the EC2 instances. It will then shard the data and process them in parallel. The output will be saved to a file spark-count-reviews.out/part-00000.

Question 1: Counting Stars

Now let's count how many reviews have a certain rating. For example, how many 4-starred ratings are there? Write a mapper and a reducer to calculate this. Think about what the keys and the values represent. You will need to edit the count_by_stars.py file.

def mapper(review):
"*** YOUR CODE HERE ***" return None # REPLACE THIS LINE
return (review.stars, 1)
def reducer(val1, val2):
"*** YOUR CODE HERE ***" return None # REPLACE THIS LINE
return val1 + val2

To test your code, run

python3 run_spark.py --count_by_stars

The output will be in spark-count-by-stars.out/part-00000. It should look something like this

(2, 1950)
(4, 6395)
(1, 1054)
(3, 3989)
(5, 4008)

Question 2: Counting Stars in SQL

Now let's use a simple SQL query to do the same computation: counting the number of reviews that have a certain rating. You will need to edit the count_by_stars_sql.py file.

def sql_query():
"*** YOUR CODE HERE ***" return 'YOUR SQL QUERY HERE'
return 'SELECT stars, count(*) FROM reviews GROUP BY stars'

To test your code, run

python3 run_spark.py --count_by_stars_sql

The output will be in spark-count-by-stars-sql.out/part-00000. It should look something like this

Row(stars=1, c1=1054)
Row(stars=2, c1=1950)
Row(stars=3, c1=3989)
Row(stars=4, c1=6395)
Row(stars=5, c1=4008)

Question 3: Analyzing Positivity

We will now try to analyze how "positive" certain words are. Positive words, such as "good", "excellent", "delicious", should appear in positive ratings more frequently than in negative ratings. We will rank positivity using a number scale from 1 to 5, where 1 represents negative words, and 5 represents positive words.

To do so, we will analyze each unique word in a review. The mapper should output a list of (key, value) pairs, where the key is a word, and the value is the number of stars associated with the review. For example:

Review('this is good', 5, 'id1') => [('this', 5), ('is', 5), ('good', 5)]
Review('this is bad', 2, 'id1')  => [('this', 2), ('is', 2), ('bad', 2)]

The reducer will take in a sequence of values associated with the same key. For example, one reducer call might take in all of the ratings associated with the word "this". The reducer will then combine all of the values by taking the average. Continuing the same example:

('this', [5, 2]) => 3.5
('is', [5, 2]) => 3.5
('good', [5]) => 5
('bad', [2]) => 2
def mapper(review):
    words = list(set(REGEX.sub(' ', review.text.lower()).split()))
"*** YOUR CODE HERE ***" return [(KEY, VALUE) for word in words]
return [(word, review.stars) for word in words]
def reducer(values):
"*** YOUR CODE HERE ***" return 0 # REPLACE THIS LINE
return avg(values)

To test your code, use

python3 run_spark.py --analyze_positivity

The output will be in spark-analyze-positivity-out/part-00000. It should look something like this

(u'awesome', 4.1554524361948957)
(u'so', 3.5585770983389682)
(u'into', 3.5268373245251858)
(u'salads', 3.666015625)

Question 4: Extra for Experts

We can now use the calculated word positivities to try to predict the star ratings for each review in the much larger Yelp academic dataset. For each review, we'll analyze the unique words in the text. We'll use the average positivity value of the words to rate each review.

You will need to modify the predict_ratings.py file. The mapper and reducer will be the same as in analyze_positivity.py. You will only need to define the predictor function. The predictor takes in a review and returns three values: the review's text, its actual rating, and your predicted rating.

def mapper(review):
    words = set(REGEX.sub(' ', review.text.lower()).split())
"*** YOUR CODE HERE ***" return # COPY FROM PREVIOUS QUESTION
return [(word, review.stars) for word in words]
def reducer(values):
"*** YOUR CODE HERE ***" return # COPY FROM PREVIOUS QUESTION
return avg(values)
def make_predictor(positivities): def predictor(review): words = set(REGEX.sub(' ', review.text.lower()).split())
"*** YOUR CODE HERE ***" return (KEY, VALUE) # REPLACE THIS LINE
return (review.text, review.stars, avg(positivities[w] for w in words if w in positivities))
return predictor

To test your code, use

python3 run_spark.py --predict_ratings

The output will be in spark-predict-ratings.out/part-00000. The first column contains the reviews' texts. The second column represents the actual ratings. The last column represents your predicted ratings. The closer the numbers the better. It should look something like:

(u"I'm a fan of soft serve ...", 5.0, 4.1235340109460514)
(u"Pretty great!  Okay, so this place ...)", 4.0, 3.6952811547386157)
(u'The Tale of the 4-Starred, Up Close and Personal Bar...', 4.0, 3.0033519553072625)

Terminating EC2 Instances

We hope you enjoyed the lab. Please terminate your EC2 instances to save costs. Use

python3 run_spark.py --destroy