How to run and understand MapReduce on your laptop in 10 minutes

by Daniel Rubio

You have heard or read about MapReduce and its power to process big data in the cloud. But you have no idea how it actually works or how to use it. MapReduce is a complex technology because it performs its work in parallel and uses distributed computing as its foundation. MapReduce is particularly more complex to setup than to use, which can be intimidating to first time users. You can simulate MapReduce's parallel & distributed on your own laptop to make MapReduce setup simple and also make it easier to understand MapReduce's core concepts.

MapReduce offers an approach to distribute big data processing tasks across multiple servers so tasks as a whole are done faster. This is the essence of MapReduce, even though navigating its ABC's and setting it up can be intimidating for first time users. MapReduce is a framework, so it's closely tied to programming tasks to achieve its work. In fact, this is how MapReduce gets its name: by applying map and reduce functions to data.

When to consider MapReduce for your data

Before going in head first to implement map and reduce functions on your big data, it's convenient to first answer the question, when is it even convenient to consider MapReduce for your big data and cloud projects ? After all, why even go to the trouble of using MapReduce, if you already have data in a database or if you have a data file ready to put into a database for application access.

If your current data is a means to an end (i.e. you're more interested in data produced by processing or grouping your current big data) then MapReduce represents an ideal solution to speed up processing times and reduce the size of your data. Although data in a database or a CSV file can be processed or grouped into smaller subsets to satisfy an application's requirements, processing or grouping data in these storage technologies can be time consuming, particularly if the data universe you're working with is into the Terabytes or millions of records. MapReduce is designed to work in parallel, which means it works faster for an equal processing or grouping task done on a regular database or CSV file.

This is not to say MapReduce is a 'magic bullet' to speed up processing on data applications. One of the biggest drawbacks of MapReduce is that it's not a storage system, but rather a batch system. This means that unlike a database or CSV file in which data can be accessed and processed directly from an application, MapReduce requires that you process data as a separate step and then place its results on a storage system (e.g. database or CSV file) so it can be accessed by an application.

There is in fact a fine line trying to strike a balance when it's convenient to use MapReduce or not. If in order to gain insight from a large data set the processing times for data stored on a database or CSV takes minutes or even hours, incorporating MapReduce into an application's workflow can provide a significant performance boost, even when MapReduce implies using additional software and an extra ETL(Extract-Transform-Load) step from storage system to MapReduce and back to a storage system. Table 1 contains a series of scenarios under which to consider MapReduce, including the type of source data, the functionality required by the map and reduce functions, as well as the end result.

Table 1.- MapReduce scenarios - Source data, map & reduce functionality and end results

Source data (Into millions of records or TB in size)Map function functionalityReduce function functionalityEnd result
Company departmental sales historyMap departments to salesReduce sales to grand total by departmentGet total sales for each department
Advertising banners to clicksMap banners to clicksReduce clicks to total by bannerGet total clicks for each banner
Earthquake locationsMap locations to earthquakesReduce earthquake locations to total countGet total earthquakes for each location
Historic Olympic medal winnersMap countries to medalsReduce medals to total count by countryGet total medals for each country
*Web page linksMap web pages to linksReduce links to total by web pageGet total links per web page
* This scenario is similar to how Google initially calculated the relevance for its search results.

Although a large ecosystem of tools and products have grown around MapReduce, often hiding some its core building blocks (e.g. map and reduce functions) in favor of more friendlier approaches -- many of which I have to say often lead to the perceived complexity of MapReduce -- at its core MapReduce is just this: a batch process where two custom functions are applied to big data sets.

So no matter how 'turn-key' a MapReduce solution offered by a cloud application appears to be (e.g. 'a one click solution' or 'a few dollars'), it's vital that you first firmly grasp MapReduce's core functionality, which is why the next section walks you through how to use MapReduce on your laptop in less than 10 minutes.

Running MapReduce on your laptop in less than 10 minutes

Since MapReduce is a framework, it's offered in a variety of programming languages and by many different vendors. To get you started with MapReduce I'll use the open source Python implementation called mincemeat. Among MapReduce implementations, mincemeat is one of the simplest to install. And considering some MapReduce implementations can take hours to install, mincemeat is a good choice if you're starting out with MapReduce.

You can get mincemeat at https://pypi.python.org/pypi/mincemeat or this adapted mincemeat version I created for Python 3. Once you download mincemeat's source code from either location, unzip the content's of the file and proceed to install the source as any Python package (e.g. execute python setup.py install inside the main directory). Once you setup mincemeat on your laptop, you can proceed to design a MapReduce task.

Besides being easy to install and lightweight, mincemeat also doesn't require you to set up multiple servers right after installation like most MapReduce implementations -- although mincemeat does support distributed servers. In addition, mincemeat also keeps configuration and dependencies to a minimum, with no need to setup other supporting software like daemons or file systems which can be off putting to newcomers.

Listing 1 illustrates a MapReduce task built around mincemeat that counts locations (i.e. city.state and country) present in a CSV file -- you can get the source CSV file (~15MB) at AviationData.csv.

Listing 1 - MapReduce task to count location occurrences, from source CSV file
#!/usr/bin/env python
import mincemeat
import csv 
import operator

# Define map function
def map_bigdata(k, v):
    for field in v:
        if field == '':
            continue
        yield field.upper(),1

# Define reduce function
def reduce_bigdata(k, ones):
    return len(ones)

# Read source data from CSV file, use csv module
# NOTE: You can get the AviationData.csv file (~15MB) at https://www.cloudstorageoptions.com/static/data/AviationData.csv
allrows = csv.reader(open('AviationData.csv', 'rb'), delimiter=',')
# Define dictionary placeholder for source data 
source_data = {}
# Loop over source data
for index,row in enumerate(allrows):
    # Generate dictionary, with row key index and value from city.state and country column
    source_data[index] = [row[4],row[5]]

# Initiate mincemeat master server 
s = mincemeat.Server()

# Assign datasource to mincemeat (can be any dictionary-like object)
s.datasource = source_data
# Assign map function to mincemeat
s.mapfn = map_bigdata
# Assign reduce function to mincemeat 
s.reducefn = reduce_bigdata

# Run master server
results = s.run_server(password="bigdata")

# Results are unordered dictionary, so Sort items in dictionary for highest counts
sorted_results = sorted(results.iteritems(), key=operator.itemgetter(1))
# Rever dictionary so highest counts are first elements
sorted_results.reverse()
# Loop over first 10 highest counts 
for freq in sorted_results[:10]:
    # Print each item 
    print(freq)

As you can likely tell by the first line #!/usr/bin/env python, listing 1 in its entirety is a Python script. That said, place the contents of listing 1 in a file called laptop_mapreduce.py to execute the MapReduce task like a regular Python script (e.g.python laptop_mapreduce.py) and also ensure you have the source CSV file AviationData.csv in the same directory.

After the first script line, we import three modules: mincemeat for MapReduce operations, csv to get data from a CSV file and operator to sort MapReduce results. Next, you'll see the methods map_bigdata and reduce_bigdata which are the Map and Reduce functions through which the data will run -- I'll describe these methods shortly in the MapReduce workflow.

The csv.reader statement is used to load data from a CSV file -- in this case the file named AviationData.csv -- located in the same directory as the Python script. The CSV data is loaded as a Python list by the csv module. Mincemeat expects a Python dictionary as it's source data, therefore we create a dictionary reference source_data. Next, a loop is done on the entire contents (i.e. rows) of the CSV data, Python's enumerate is used to gain access to the loop index. On each iteration, the index variable contains the loop count and the row variable an actual CSV record (e.g.20120423X32944, Accident, WPR12LA181, 04/22/2012, Monmouth.OR, United States, 44.841944). On each iteration, the index is used as the key to the source_data dictionary and a list with the fourth and fifth elements of each row is used as the dictionary value.

After the previous sequence, the source_data dictionary used as the source data for mincemeat has contents like the following: {1:['Monmouth.OR','United States'],2:['Toronto.Canada','Canada'],3:['Fairbanks.AK','United States]}.

Next, we declare a mincemeat master server instance using mincemeat.Server() and assign it to the reference s. The s reference is important because it's also used to assign source data, as well as the Map and Reduce methods applied to the data. The statement s.datasource = source_data assigns the source_data reference to the mincemeat master server, s.mapfn = map_bigdata assigns map_bigdata as the map method to the mincemeat master server and s.reducefn = reduce_bigdata assigns reduce_bigdata as the reduce method to the mincemeat master server.

Finally, the mincemeat master server is started using the statement s.run_server(password="bigdata") and the output for the MapReduce task is sent to the results reference. Next, lets explore the actual Map and Reduce methods.

The Map method called map_bigdata receives two input parameters, k for each dictionary key in the source data and v for each dictionary value in the source data. Based on the source_data dictionary, this means the map method is invoked multiple times with each dictionary element (e.g. map_bigdata(1,['Monmouth.OR','United States']),map_bigdata(2,['Toronto.Canada','Canada']),map_bigdata(3,['Fairbanks.AK','United States'])). The map_bigdata method then loops over each field value and if the value is an empty string skips to the next iteration. If the field is not empty, the yield keyword is called with the field converted to upper case using the Python string method upper() -- in this case either a city.state or country -- along with a 1 to indicate one occurrence of the field -- see the sidebar in case you're unfamiliar with the Python yield keyword.

Python yield keyword, generator methods and return statement

By using the yield keyword, a method becomes what is known as a Python generator. A generator is a method that produces a sequence of results instead of a single value. Python methods typically use the return keyword to indicate the final result and termination of the method's logic. By using the yield keyword, a method continues to accumulate results until its workflow (i.e. caller) is finished.

Given the way yield works, it's particularly suited for how the MapReduce works, this fact will become more evident as I describe the MapReduce workflow in the following paragraphs.

The outcome of the map_bigdata method is a Map of values generated from the source data. Given how the yield keyword works -- see previous sidebar -- map_bigdata is called recursively for every element in the source data, until it produces a sequence of values. This sequence of values or Map is then passed as the input to the Reduce method. Listing 2 illustrates the results of the map_bigdata for a certain set of source data.

Listing 2 - MapReduce output results from Map method
# Source data for mincemeat 
{1:['Monmouth.OR','United States'],2:['Toronto.Canada','Canada'],3:['Fairbanks.AK','United States'],4:['Janesville.WI','United States']}
# Final output map after calling map_bigdata in Listing 1 with previous source data 
[('CANADA',[1]),('JANESVILLE.WI',[1]),('TORONTO.CANADA',[1]),('MONMOUTH.OR',[1]),('UNITED STATES',[1, 1, 1]),('FAIRBANKS.AK',[1])]

Notice in listing 2 how the source data is converted when it passes through the map_bigdata method. For example, the value United States is converted to uppercase and for each occurrence in the source data a 1 is added to the list. Since Canada occurs only once in the source data, the output appears as ('CANADA',[1]). The reason behind the uppercase conversion and 1 is because the map_bigdata uses yield field.upper(),1 and because yield accumulate results, this causes a 1 to be added each time the field ocurrs.

Once the map_bigdata method finishes, its results are passed individually to the Reduce method called reduce_bigdata. Based on the output in listing 2, this means the reduce method is invoked multiple times with each element (e.g. reduce_bigdata('CANADA',[1]),reduce_bigdata('JANESVILLE.WI',[1]),reduce_bigdata('UNITED STATES',[1, 1, 1])). Turning our attention back to listing 1, we can see the reduce_bigdata method just returns the length of the its second argument, which in this case corresponds to the number of times each value (i.e. city.state or country) ocurrs.

After the reduce_bigdata method is applied to each element in the Map, the MapReduce task finishes, with the results assigned to the results reference. The final lines are standard Python sorting and looping sequences to get the top results of the results MapReduce task. Listing 3 illustrates the execution of the mincemeat MapReduce Python script and its output.

Listing 3 - Mincemeat MapReduce execution and output
$ python laptop_mapreduce.py 
# WAITS_FOR_MAP_REDUCE_WORKERS
('UNITED STATES', 68230)
('ANCHORAGE.AK', 483)
('ALBUQUERQUE.NM', 230)
('CHICAGO.IL', 228)
('HOUSTON.TX', 227)
('MIAMI.FL', 226)
('FAIRBANKS.AK', 204)
('CANADA', 181)
('BAHAMAS', 179)
('PHOENIX.AZ', 173)

As illustrated in listing 3, the execution of the MapReduce script -- contained listing 1 -- is done as any other Python script. The output is a list with the 20 highest city.state or country counts found in the selected CSV file's columns. After invoking python laptop_mapreduce.py you'll notice the Python script waits. In fact, it will wait endlessly until a MapReduce worker is activated. Listing 4 illustrates how to start a MapReduce worker with mincemeat.

Listing 4 - Start mincemeat MapReduce worker
mincemeat.py -p bigdata 127.0.0.1

The command in listing 4 starts a MapReduce worker on your laptop that connects to the master server at the I.P address 127.0.0.1 I.P address and uses the password bigdata as a connection parameter. The I.P address 127.0.0.1 is the loop-back address and always points to the local machine, where as the password is hard-coded in the MapReduce script -- in listing 1 -- on the line s.run_server(password="bigdata"). Basically listing 4 says: Start a MapReduce worker to work on the master server at 127.0.0.1 and use the bigdata password to make the connection. Once the MapReduce worker is started, the logic in listing 3 proceeds and produces its output. I'll explain the reason behind this design of a MapReduce worker in the next section.

That's it, you've successfully run a MapReduce task on your laptop. Now that you've finished this MapReduce exercise, you might be left wondering what's so special about MapReduce ? After all, couldn't a regular SQL query against a database with the same data do the same thing ? Or a script -- written in a series of programming languages -- executed against a CSV file with the same data do the same thing too ? The short answer to both these questions is yes, but the longer answer is no. I didn't discuss two important features about MapReduce to make the concepts of Map and Reduce functions crystal clear, but now that you have a firm grasp of these functions, it's time to move on to MapReduce's distributed and parallel nature.

Open ended logic is allowed for Map and Reduce functions

Keep in mind you can make use of any construct inside the Map and Reduce functions. In the previous example, I only used some of the input parameters and a single loop and conditional, but you can leverage any input parameters, loops, conditionals or other Python syntax. As far the mincemeat MapReduce implementation is concerned, the important thing is the source data be provided as a Python dictionary -- which is the input to the Map method -- and the result returned by the Map method is formatted to be the input for the Reduce method.

Understanding the distributed and parallel nature of the MapReduce architecture

There's an important design concept described toward the end of the last exercise that makes MapReduce more powerful than what can be perceived from analyzing the MapReduce script in listing 1: The use of a MapReduce worker. MapReduce is designed to distribute its workload across multiple servers or MapReduce workers. In the previous exercise you only created a single MapReduce worker on the same machine as the master server, but it's also possible to create multiple MapReduce workers on different machines so the workload is done in parallel.

The ability to do parallel processing is what makes MapReduce a more powerful alternative for processing or grouping big data vs. doing the same task against data on a regular database or CSV file. And it's both the Map and Reduce functions that can be delegated to different workers to expedite the processing of the MapReduce task as a whole. However, in order to do parallel processing of both the Map and Reduce functions across multiple workers, you also add complexity to the system because it becomes distributed.

Nevertheless, one of the beauties of MapReduce is that you don't need to worry -- too much at least -- about provisioning your Map and Reduce functions to work in a distributed environment. As you saw in the last section, there wasn't anything specific to distributed or parallel processing in either the Map or Reduce functions, most of these details are taken care of by the MapReduce framework itself 'under the hood'. To give you an idea of the provisioning the MapReduce framework has to cover, table 2 contains what is known as the 8 fallacies of distributed computing.

Table 2.- Eight fallacies of distributed computing
  1. The network is reliable
  2. Latency is zero
  3. Bandwidth is infinite
  4. The network is secure
  5. Topology doesn't change
  6. There is one administrator
  7. Transport cost is zero
  8. The network is homogeneous

These guiding principles in table 2 are fallacies because designers of distributed computing applications often assume they're true. In reality, if multiple servers are interacting with one another and you want to successfully achieve a common goal, you have to assume the antithesis of these principles: The network is not reliable, latency in never zero,...., the network won't be homogenous.

And it's precisely these last principles that MapReduce is designed to take care of: What if a MapReduce worker suddenly becomes unreachable ? What if a disk on a MapReduce worker suddenly fails ? What if a new MapReduce worker is added in the middle of a task ? All these questions have to be answered and fulfilled in order for a distributed system to work correctly. In the case of MapReduce, it's the master MapReduce server which distributes the workload to workers and keeps track of successful or failed tasks, where as the workers do the actual processing and maintain communication with the master to report progress.

In a previous paragraph, I mentioned how with MapReduce you don't need to worry 'too much' about provisions for parallel and distributing computing. Let me elaborate on the 'too much' remark. While MapReduce itself is designed so tasks don't require explicit knowledge or code for parallel or distributed environments, running a full-fledged MapReduce cluster with a master server and multiple workers can require a lot of supporting software to make MapReduce work correctly. This software can include: daemons to establish communication between servers, distributed file systems to share data between servers, monitoring services to detect load & health of workers, among other things.

It's due to this supporting software required alongside MapReduce that some MapReduce frameworks can take hours to install -- as I mentioned at the outset. Nevertheless, in order to truly achieve the full potential of MapReduce, it's necessary to deal with both the installation and configuration of MapReduce itself, as well as some of this other supporting software which is more in tune with system administration than actual application development.

Fortunately, with the emergence of cloud computing providers, the effort required to install and configure a full-fledged MapReduce cluster has dropped considerably. Many providers make this process as easy as uploading your big data sets and with a few clicks you can have a master server along with multiple workers running in a matter of minutes. And on top of the ease with which you can get started, there's also the fact that most providers offer a 'pay as you use' model where there's no need to make a strong upfront investment in hardware or software.

About the author: Daniel Rubio

Daniel Rubio

Daniel started working with technology fresh out of school for a Fortune 500 company, later on he moved on to doing contract work as an independent technology consultant. Throughout the years, he's continued to work at the code level -- as a developer and architect -- CTO-level advisor on platform selection strategies, professional technical writer and entrepreneur.

Daniel has been CTO/technical founder for sites that include: cleandatasets.com -- a data cleansing platform; videoformatter.com -- a video processing engine; and awardfun.com -- an award reference site for film, music and television. Daniel's writing experience spans multiple articles and books, with his early years focusing on Java, Linux and Open source topics and his more recent focus being Python/Django, Cloud computing and JavaScript topics.