A few days ago, I started reading a really interesting book called "Data-Intensive Text Processing with MapReduce" (free version is available here).[1] In it, the authors discuss some common patterns for writing MapReduce jobs.
One of these patterns is in-mapper combining. In its most basic form, this strategy is designed to save time by doing some aggregation within the mapper instead of emitting tons of (k, v)
pairs which will bog everything down in the shuffle-and-sort phase.
Using the canonical word count example, Lin and Dyer give the basic MapReduce implementation:
We can implement this very quickly in Python using the mrjob
package. First, let's get a corpus to work on. We'll use a plain text version of "Great Expectations" from Project Gutenberg.
!wget --output-document=data/great-expectations.txt http://www.gutenberg.org/cache/epub/1400/pg1400.txt
We'll want to make everything lower case, remove punctuation, and for convenience change multiple whitespace characters to a single space. We could write Python for this but we may as well just save the effort and chain some quick command line tools.
!cat data/great-expectations.txt | tr [:upper:] [:lower:] | sed 's/[^a-z\n]/\ /g' | sed -e 's/\s\+/\ /g' > data/great-expectations-cleaned.txt
Here's what the file looks like cleaned up:
!sed -n '3000,3010p' data/great-expectations-cleaned.txt
We're ready to set up our MapReduce job. Here's a really simple version adapted straight from the mrjob
documentation:
from mrjob.job import MRJob
class MRWordFreqCount(MRJob):
""" extend mrjob's base class and define mapper and reducer """
def mapper(self, _, line):
""" splits a line of text by whitespace and returns (word, 1) """
for word in line.strip().split():
yield word.lower(), 1
def reducer(self, word, counts):
""" sum the counts """
yield word, sum(counts)
if __name__ == '__main__':
MRWordFreqCount.run()
The problem with doing it the straight-forward way is that we will be emitting a (word, 1)
pair for every single word in the entire document. As you can imagine, emitting more pairs from the mappers will make the shuffle-and-sort phase longer. Let's see how many pairs we'll emit in our Great Expectations example:
!wc -w data/great-expectations-cleaned.txt
Here we're dealing with just under 200,000 words. That already sounds like a lot, but imagine how many more we'd be dealing with if instead of processing just one book we were trying to examine relative word counts in the entire Project Gutenberg corpus or the whole web! Even on a large Amazon EMR cluster, the shuffle-and-sort phase could take a while. For truly large jobs, we might even have to worry about running out of memory.
Lin and Dyer describe a common optimization for this scenario, the in-mapper combining pattern. Here, we count up all the occurences a of word within the mapper, and then emit pairs where the value has already been aggregated. Here's their pseudocode:
We'll build on our previous implentation, this time using a plain old Python defaultdict
to implement our in-mapper combiner.
from mrjob.job import MRJob
from collections import defaultdict
class MRWordFreqCountInMapperCombiner(MRJob):
def mapper_init(self):
""" set up our temporary map from keys to values """
self.word_counts = defaultdict(int)
def mapper(self, _, line):
""" increment the appropriate words in our counter """
for word in line.strip().split():
self.word_counts[word] += 1
def mapper_final(self):
""" now emit all the (k, v) pairs we stored """
for word, value in self.word_counts.iteritems():
yield word, value
def reducer(self, word, counts):
""" sum the counts """
yield word, sum(counts)
if __name__ == '__main__':
MRWordFreqCountInMapperCombiner.run()
We'll actually save this to a file and run it from the command line (running it from an IPython notebook is a pain).
!python in_mapper_combine.py < data/great-expectations-cleaned.txt > data/output.txt
Here's what the results look like:
!head -n 20 data/output.txt
One thing Lin and Dyer make sure to point out is that in-mapper combining is not a silver bullet:
There are, however, drawbacks to the in-mapper combining pattern. First, it breaks the functional programming underpinnings of MapReduce, since state is being preserved across multiple input key-value pairs. Ultimately, this isn't a big deal, since pragmatic concerns for efficiency often trump theoretical "purity", but there are practical consequences as well. Preserving state across multiple input instances means that algorithmic behavior may depend on the order in which input key-value pairs are encountered. This creates the potential for ordering-dependent bugs, which are difficult to debug on large datasets in the general case (although the correctness of in-mapper combining for word count is easy to demonstrate). Second, there is a fundamental scalability bottleneck associated with the in-mapper combining pattern. It critically depends on having sufficient memory to store intermediate results until the mapper has completely processed all key-value pairs in an input split. In the word count example, the memory footprint is bound by the vocabulary size, since it is theoretically possible that a mapper encounters every term in the collection. Heap's Law, a well-known result in information retrieval, accurately models the growth of vocabulary size as a function of the collection size—the somewhat surprising fact is that the vocabulary size never stops growing.
[1] Lin, Jimmy, and Chris Dyer. "Data-intensive text processing with MapReduce." Synthesis Lectures on Human Language Technologies 3.1 (2010): 1-177.