Hadoop Streaming with Amazon Elastic MapReduce, Python and mrjob

In a previous rant about data science & innovation, I made reference to a problem I’m having at work where I wanted to classify roughly a quarter-billion URLs by predicted website content (without having to actually visit the website). A few colleagues have asked how you go about even starting to solve a problem like that, and the answer is massively parallel processing.

Attacking the problem using a local machine

In order to classify the URLs, the first thing that’s needed is a customized dictionary of words relative to our company’s subject matter. When you have a corpus of words that are already defined (such as a digitized book), finding the population of words is relatively simple: split the text based on spaces & punctuation and you’re more or less done. However, with a URL, you have one continuous string with no word boundaries. One way to try and find the boundaries would be the following in Python:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import collections
import nltk

#Dictionary from Unix
internal_dict = open("/usr/share/dict/words")
#Stopwords corpus from NLTK
stopwords = nltk.corpus.stopwords.words('english')

#Build english_dictionary of prospect words
english_dictionary = []
for line in internal_dict:
    if line not in stopwords and len(line) > 4:  #make sure only "big", useful words included
        english_dictionary.append(line.rstrip('\n'))

#How many words are in the complete dictionary?        
len(english_dictionary)

#Import urls
urls = [line for line in open("/path/to/urls/file.csv")]

#Build counter dictionary
wordcount = collections.Counter()
for word in english_dictionary:    #Loop over all possible English words
  for url in urls:     #Loop over all urls in list
    if word in url:
      wordcount[word] += 1 #Once word found, add to dictionary counter

The problem with approaching the word searching problem in this manner is you are limited to the power of your local machine. In my case with a relatively new MacBook Pro, I can process 1,000 lines in 19 seconds as a single-threaded process. At 250,000,000 URLs, that’s 4.75 million seconds…197,916 minutes…3,298 hours…137 days…4.58 months!  Of course, 4.58 months is for the data I have now, which is accumulating every second of every day. Clearly, to find just the custom dictionary of words, I’ll need to employ MANY more computers/tasks.

Amazon ElasticMapreduce = Lots of Horsepower

One thing you might notice about the Python code above is that the two loops have no real reason to be run serially; each comparison of URL and dictionary word can be run independently of each other (often referred to as “embarrassingly parallel”). This type of programming pattern is one that is well suited to running on a Hadoop cluster. With Amazon ElasticMapReduce (EMR), we can provision tens, hundreds, even thousands of computer instances to process this URL-dictionary word comparison, and thus getting our answer much faster. The one downside of using Amazon EMR to access Hadoop is that EMR expects to get a Java ``.jar` file containing your MapReduce code. Luckily, there is a Python package called MRJob that does the Python-to-Java translation automatically, so that users don’t have to switch languages to get massively parallel processing.

Writing MapReduce code

The Python code above, keeping a tally of words & number of occurrences IS a version of the MapReduce coding paradigm. Going through the looping process to do the comparison is the “Map” portion of the code and the sum of the word values is the “Reduce” step. However, in order to use EMR, we need to modify the above code to remove the outer URL loop:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from mrjob.job import MRJob

class MRWordCounter(MRJob):    

  def mapper(self, english_dict, line):
  english_dict = ['aal', 'aalii', 'aam', 'aani'...'zythum', 'zyzomys', 'zyzzogeton']

  for word in english_dict:
            if word in line:
                yield word, 1

  def reducer(self, word, occurrences):
        yield word, sum(occurrences)

if __name__ == '__main__':
    MRWordCounter.run()

The reason why we remove the outer loop that loops over the lines of the URL file is because that is implicit to the EMR/Hadoop style of processing. We will specify a file that we want to process in our Python script, then EMR will distribute the URLs file across all the Hadoop nodes. Essentially, our 250,000,000 million lines of URLs will become 1,000 tasks of length 250,000 urls (assuming 125 nodes of 8 tasks each).

Calling EMR from the Python command line

Once we have our Python MRJob code written, we can submit our code to EMR from the command line. Here’s what an example code looks like:

1
python ~/Desktop/mapreduce.py -r emr s3://<s3bucket>/url_unload/0000_part_01 --output-dir=s3://<s3bucket>/url_output --num-ec2-instances=81

There are many more options that are possible for the MRJob package, so I highly suggest that you read the documentation for EMR options. One thing to also note is that MRJob uses a configuration file to host various options for EMR called “runners”.  Yelp (the maker of the MRJob package) has posted an example of the mrjob.conf file with the most common options to use. In this file, you can specify your Amazon API keys, the type of instances you want to use (I use c1.xlarge spot instances for the most part), where your SSH keys are located and so on.

Results

In terms of performance, I have 8 files of 5GB’s each of URLs (~17.5 million lines per file) that I’m running through the MRJob code above. The first file was run with 19 c1.xlarge instances, creating on average 133 mappers and 65 reducers and taking 917 minutes (3.14 seconds/1000 lines).  The second file was run with 80 c1.xlarge instances, creating 560 mappers and 160 reducers and taking 218 minutes (0.75 seconds/1000 lines). So using four times as many instances leads to one-fourth of the run-time.

For the most part, you can expect linear performance in terms of adding nodes to your EMR cluster. I know at some point, Hadoop will decide that it no longer needs to add any more mappers/reducers, but I haven’t had the desire to find out exactly how many I’d need to add to get to that point! 🙂


A Beginner's Look at Julia

Over the past month or so, I’ve been playing with a new scientific programming language called ‘Julia’, which aims to be a high-level language with performance approaching that of C. With that goal in mind, Julia could be a replacement for the ‘multi-language’ problem of needing to move between R, Python, MATLAB, C, Fortran, Scala, etc. within a single scientific programming project.  Here are some observations that might be helpful for others looking to get started with Julia.

Get used to ‘Git’ and ‘make’

While there are pre-built binaries for Julia, due to the rapid pace of development, it’s best to build Julia from source. To be able to keep up with the literally dozen code changes per day, you can clone the Julia GitHub repository to your local machine. If you use one of the GitHub GUI’s, this is as easy as hitting the ‘Sync Branch’ button to receive all of the newest code updates.

To install Julia, you need to compile the code. The instructions for each supported operating system are listed on the Julia GitHub page. For Mac users, use Terminal to navigate to the directory where you cloned Julia, then run the following command, where ‘n’ refers to the number of concurrent processes you want the compiler to use:

1
make -j n 

I use 8 concurrent processes on a 2013 MacBook Pro and it works pretty well. Certainly much faster than a single process. Note that the first time you run the make command, the build process will take much longer than successive builds, as Julia downloads all the required libraries needed. After the first build, you can just run the make command with a single process, as the code updates don’t take very long to build.

Package management is also done via GitHub. To add Julia packages to your install, you use the Pkg.add() function, with the package name in double-quotes.

Julia code feels very familiar

Text file import

Although the Julia documentation makes numerous references to MATLAB in terms of code similarity, Julia feels very familiar to me as an R and Python user. Take reading a .csv file into a dataframe and finding the dimensions of the resulting object

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#R: Read in 1987.csv from airline dataset into a dataframe
#No import statement needed to create a dataframe in R
airline1987 <- read.csv("~/airline/1987.csv")
dim(airline1987)
[1] 1311826      29

#Python: use pandas to create a dataframe
import pandas as pd
airline1987 = pd.read_csv("/Users/randyzwitch/airline/1987.csv")
airline1987.shape
Out[7]: (1311826, 29)

#Julia: use DataFrames to create a dataframe
using DataFrames
airline1987 = readtable("/Users/randyzwitch/airline/1987.csv")
size(airline1987)
(1311826,29)

In each language, the basic syntax is to call a ‘read’ function, specify the .csv filename, then the defaults of the function read in a basic file. I also could’ve specified other keyword arguments, but for purposes of this example I kept it simple.

Looping

Looping in Julia is similar to other languages. Python requires proper spacing for each level of a loop, with a colon for each evaluated expression. And although you generally don’t use many loops in R, to do so requires using parenthesis and brackets.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#Python looping to create a term-frequency dictionary

from collections import Counter

term_freq = Counter()
for word in english_dictionary:
  for url in url_list:
    if word in url_list:
      term_freq[word] += 1

#Julia looping to create a term-frequency dictionary

term_freq=Dict{String, Int64}()
for word in english_dictionary
    for url in url_list
        if search(line, word) != (0:-1)
            term_freq[word]=get(term_freq,word,0)+1
        end
    end
end

If you’re coming from a Python background, you can see that there’s not a ton of difference between Python looping into a dictionary vs. Julia. The biggest differences are the use of the end control-flow word and that Julia doesn’t currently have the convenience “Counter” object type. R doesn’t natively have a dictionary type, but you can add a similar concept using the hash package.

Vectorization

While not required to achieve high performance, Julia also provides the functional programming construct of vectorization and list comprehensions. In R, you use the *apply family of functions instead of loops in order to apply a function to multiple elements in a list. In Python, there are the map and reduce functions, but there is also the concept of list comprehensions. In Julia, both of the aforementioned functionalities are possible.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#Cube every number from 1 to 100

#Python map function
cubes = map(lambda(x): x*x*x, range(1,100))

#Python list comprehension
cubes= [x*x*x for x in range(1,100)]

#R sapply function
cubes <- sapply(seq(1,100), function(x) x*x*x)

#Julia map function
cubes = map((x)-> x*x*x, [1:100])

#Julia list comprehension
cubes = [x*x*x for x in [1:100]]

In each case, the syntax is just about the same to apply a function across a list/array of numbers.

A small, but intense community

One thing that’s important to note about Julia at this stage is that it’s very early. If you’re going to be messing around with Julia, there’s going to be a lot of alone-time experimenting and reading the Julia documentation. There are also several other resources including a Julia-Users Google group, Julia for R programmers, individual discussions on GitHub in the ‘Issues’ section of each Julia package, and a few tutorials floating around (here and here).

Beyond just the written examples though, I’ve found that the budding Julia community is very helpful and willing in terms of answering questions. I’ve been bugging the hell out of John Myles White and he hasn’t complained (yet!), and even when code issues are raised through the users group or on GitHub, ultimately everyone has been very respectful and eager to help. So don’t be intimidated by the fact that Julia has a very MIT and Ph.D-ness to it…jump right in and migrate some of your favorite code over from other languages.

While I haven’t moved to using Julia for my everyday workload, I am getting facility to the point where I’m starting to consider using Julia for selected projects. Once the language matures a bit more, JuliaStudio starts to approach RStudio in terms of functionality, and I get more familiar with the language in general, I can see Julia taking over for at least one if not all of my scientific programming languages.


Getting Started Using Hadoop, Part 3: Loading Data

In part 2 of the “Getting Started Using Hadoop” series, I discussed how to build a Hadoop cluster on Amazon EC2 using Cloudera CDH. This post will cover how to get your data into the Hadoop Distributed File System (HDFS) using the publicly available “Airline Dataset”. While there are multiple ways to upload data into HDFS, this post will only cover the easiest method, which is to use the Hue ‘File Browser’ interface.

Loading data into HDFS using Hue

hadoop-hue-file-browser

'File Browser' in Hue (Cloudera)

Loading data into Hadoop using Hue is by far the easiest way to get started. Hue provides a GUI that provides a “File Browser” like you normally see in Windows or OSX. The workflow here would be to download each year of Airline data to your local machine, then upload each file using the Upload -> Files menu drop-down.

While downloading files from one site on the Internet, then uploading files to somewhere else on the Internet is somewhat wasteful of time and bandwidth, as a tutorial to get started with Hadoop this isn’t the worst thing in the world. For those of you who are OSX users and comfortable using Bash from the command line, here’s some code so you don’t have to babysit the download process:

1
2
3
4
5
$ for i in {1987..2008}
> do
> curl http://stat-computing.org/dataexpo/2009/$i.csv.bz2 > $i.csv.bz2
> bunzip2 $i.csv.bz2
> done

Because you are going to be uploading a bunch of text files to your Hadoop cluster, I’d recommend zipping the files prior to upload. It doesn’t matter if you use .zip or .gz files with one key distinction: if you use .zip files, you will upload using the “Zip Files” button in the File Browser; if you choose .gz, then you must use the “Files” line in the File Browser. Not only will zipping the files make the upload faster, but it will also make sure you only need to do the process once (as opposed to hitting the upload button on each file). Using the .zip file upload process, you should something like the following…a new folder with all of the files extracted automatically:

hue-file-browser-unzipped

.zip file automatically extracted into folder with files (Hortonworks)

Next Steps

With the airline .csv files loaded for each year, we can use Pig or Hive to load the tables into a master dataset & schema. That will be the topic of the next tutorial.


  • RSiteCatalyst Version 1.4.12 (and 1.4.11) Release Notes
  • Self-Service Adobe Analytics Data Feeds!
  • RSiteCatalyst Version 1.4.10 Release Notes
  • WordPress to Jekyll: A 30x Speedup
  • Bulk Downloading Adobe Analytics Data
  • Adobe Analytics Clickstream Data Feed: Calculations and Outlier Analysis
  • Adobe: Give Credit. You DID NOT Write RSiteCatalyst.
  • RSiteCatalyst Version 1.4.8 Release Notes
  • Adobe Analytics Clickstream Data Feed: Loading To Relational Database
  • Calling RSiteCatalyst From Python
  • RSiteCatalyst Version 1.4.7 (and 1.4.6.) Release Notes
  • RSiteCatalyst Version 1.4.5 Release Notes
  • Getting Started: Adobe Analytics Clickstream Data Feed
  • RSiteCatalyst Version 1.4.4 Release Notes
  • RSiteCatalyst Version 1.4.3 Release Notes
  • RSiteCatalyst Version 1.4.2 Release Notes
  • Destroy Your Data Using Excel With This One Weird Trick!
  • RSiteCatalyst Version 1.4.1 Release Notes
  • Visualizing Website Pathing With Sankey Charts
  • Visualizing Website Structure With Network Graphs
  • RSiteCatalyst Version 1.4 Release Notes
  • Maybe I Don't Really Know R After All
  • Building JSON in R: Three Methods
  • Real-time Reporting with the Adobe Analytics API
  • RSiteCatalyst Version 1.3 Release Notes
  • Adobe Analytics Implementation Documentation in 60 Seconds
  • RSiteCatalyst Version 1.2 Release Notes
  • Clustering Search Keywords Using K-Means Clustering
  • RSiteCatalyst Version 1.1 Release Notes
  • Anomaly Detection Using The Adobe Analytics API
  • (not provided): Using R and the Google Analytics API
  • My Top 20 Least Useful Omniture Reports
  • For Maximum User Understanding, Customize the SiteCatalyst Menu
  • Effect Of Modified Bounce Rate In Google Analytics
  • Adobe Discover 3: First Impressions
  • Using Omniture SiteCatalyst Target Report To Calculate YOY growth
  • Google Analytics Individual Qualification (IQ) - Passed!
  • Google Analytics SEO reports: Not Ready For Primetime?
  • An Afternoon With Edward Tufte
  • Google Analytics Custom Variables: A Page-Level Example
  • Xchange 2011: Think Tank and Harbor Cruise
  • Google Analytics for WordPress: Two Methods
  • WordPress Stats or Google Analytics? Yes!
  • Building a Data Science Workstation (2017)
  • JuliaCon 2015: Everyday Analytics and Visualization (video)
  • Vega.jl, Rebooted
  • Sessionizing Log Data Using data.table [Follow-up #2]
  • Sessionizing Log Data Using dplyr [Follow-up]
  • Sessionizing Log Data Using SQL
  • Review: Data Science at the Command Line
  • Introducing Twitter.jl
  • Code Refactoring Using Metaprogramming
  • Evaluating BreakoutDetection
  • Creating A Stacked Bar Chart in Seaborn
  • Visualizing Analytics Languages With VennEuler.jl
  • String Interpolation for Fun and Profit
  • Using Julia As A "Glue" Language
  • Five Hard-Won Lessons Using Hive
  • Using SQL Workbench with Apache Hive
  • Getting Started With Hadoop, Final: Analysis Using Hive & Pig
  • Quickly Create Dummy Variables in a Data Frame
  • Using Amazon EC2 with IPython Notebook
  • Adding Line Numbers in IPython/Jupyter Notebooks
  • Fun With Just-In-Time Compiling: Julia, Python, R and pqR
  • Getting Started Using Hadoop, Part 4: Creating Tables With Hive
  • Tabular Data I/O in Julia
  • Hadoop Streaming with Amazon Elastic MapReduce, Python and mrjob
  • A Beginner's Look at Julia
  • Getting Started Using Hadoop, Part 3: Loading Data
  • Innovation Will Never Be At The Push Of A Button
  • Getting Started Using Hadoop, Part 2: Building a Cluster
  • Getting Started Using Hadoop, Part 1: Intro
  • Instructions for Installing & Using R on Amazon EC2
  • Video: SQL Queries in R using sqldf
  • Video: Overlay Histogram in R (Normal, Density, Another Series)
  • Video: R, RStudio, Rcmdr & rattle
  • Getting Started Using R, Part 2: Rcmdr
  • Getting Started Using R, Part 1: RStudio
  • Learning R Has Really Made Me Appreciate SAS