Understanding MapReduce in MongoDB, with Node.js, PHP (and Drupal)

June 20, 2012

(Note, some of the details in this post may be outdated. For example, MongoDB now supports Aggregation queries which simplify some of these use cases. However, the MapReduce concepts are probably still the same.)

MongoDB’s query language is good at extracting whole documents or whole elements of a document, but on its own it can’t pull specific items from deeply embedded arrays, or calculate relationships between data points, or calculate aggregates. To do that, MongoDB uses an implementation of the MapReduce methodology to iterate over the dataset and extract the desired data points. Unlike SQL joins in relational databases, which essentially create a massive combined dataset and then extract pieces of it, MapReduce iterates over each document in the set, “reducing” the data piecemeal to the desired results. The name was popularized by Google, which needed to scale beyond SQL to index the web. Imagine trying to build the data structure for Facebook, with near-instantaneous calculation of the significance of every friend’s friend’s friend’s posts, with SQL, and you see why MapReduce makes sense.

I’ve been using MongoDB for two years, but only in the last few months starting using MapReduce heavily. MongoDB is also introducing a new Aggregation framework in 2.1 that is supposed to simplify many operations that previously needed MapReduce. However, the latest stable release as of this writing is still 2.0.6, so Aggregation isn’t officially ready for prime time (and I haven’t used it yet).

This post is not meant to substitute the copious documentation and examples you can find across the web. After reading those, it still took me some time to wrap my head around the concepts, so I want to try to explain those as I came to understand them.

The Steps

A MapReduce operation consists of a map, a reduce, and optionally a finalize function. Key to understanding MapReduce is understanding what each of these functions iterates over.

Map

First, map runs for every document retrieved in the initial query passed to the operation. If you have 1000 documents and pass an empty query object, it will run 1000 times.

Inside your map function, you emit a key-value pair, where the key is whatever you want to group by (_id, author, category, etc), and the value contains whatever pieces of the document you want to pass along. The function doesn’t return anything, because you can emit multiple key-values per map, but a function can only return 1 result.

The purpose of map is to extract small pieces of data from each document. For example, if you’re counting articles per author, you could emit the author as the key and the number 1 as the value, to be summed in the next step.

Reduce

The reduce function then receives each of these key-value(s) pairs, for each key emitted from map, with the values in an array. Its purpose is to reduce multiple values-per-key to a single value-per-key. At the end of each iteration of your reduce function, you return (not emit this time) a single variable.

The number of times reduce runs for a given operation isn’t easy to predict. (I asked about it on Stack Overflow and the consensus so far is, there’s no simple formula.) Essentially reduce runs as many times as it needs to, until each key appears only once. If you emit each key only once, reduce never runs. If you emit most keys once but one special key twice, reduce will run once, getting (special key, [ value, value ]).

A rule of thumb with reduce is that the returned value’s structure has to be the same as the structure emitted from map. If you emit an object as the value from map, every key in that object has to be present in the object returned from reduce, and vice-versa. If you return an integer from map, return an integer from reduce, and so on. The basic reason is that (as noted above), reduce shouldn’t be necessary if a key only appears once. The results of an entire map-reduce operation, run back through the same operation, should return the same results (that way huge operations can be sharded and map/reduced many times). And the output of any given reduce function, plugged back into reduce (as a single-item array), needs to return the same value as went in. (In CS lingo, reduce has to be idempotent. The documentation explains this in more technical detail.)

Here’s a simple JS test, using Node.js’ assertion API, to verify this. To use it, have your mapReduce operation export their methods for a separate test script to import and test:

// this should export the map, reduce, [finalize] functions passed to MongoDB.
var mr = require('./mapreduce-query');

// override emit() to capture locally
var emitted = [];

// (in global scope so map can access it)
global.emit = function(key, val) {
  emitted.push({key:key, value:val});
};

// reduce input should be same as output for a single object
// dummyItems can be fake or loaded from DB
mr.map.call(dummyItems[0]);

var reduceRes = mr.reduce(emitted[0].key, [ emitted[0].value ]);
assert.deepEqual(reduceRes, emitted[0].value, 'reduce is idempotent');

A simple MapReduce example is to count the number of posts per author. So in map you could emit('author name', 1) for each document, then in reduce loop over each value and add it to a total. Make sure reduce is adding the actual number in the value, not just 1, because that won’t be idempotent. Similarly, you can’t just return values.length and assume each value represents 1 document.

Finalize

Now you have a single reduced value per key, which get run through the finalize function once per key.

To understand finalize, consider that this is essentially the same as not having a finalize function at all:

var finalize = function(key, value) {
  return value;
}

finalize is not necessary in every MapReduce operation, but it’s very useful, for example, for calculating averages. You can’t calculate the average in reduce because it can run multiple times per key, so each iteration doesn’t have enough data to calculate with.

The final results returned from the operation will have one value per key, as returned from finalize if it exists, or from reduce if finalize doesn’t exist.

MapReduce in PHP and Drupal

The MongoDB library for PHP does not include any special functions for MapReduce. They can be run simply as a generic command, but that takes a lot of code. I found a MongoDB-MapReduce-PHP library on Github which makes it easier. It works, but hasn’t been updated in two years, so I forked the library and created my own version with what I think are some improvements.

The original library by infynyxx created an abstract class XMongoCollection that was meant to be sub-classed for every collection. I found it more useful to make XMongoCollection directly instantiable, as an extended replacement for the basic MongoCollection class. I added a mapReduceData method which returns the data from the MapReduce operation. For my Drupal application, I added a mapReduceDrupal method which wraps the results and error handling in Drupal API functions.

I could then load every collection with XMongoCollection and run mapReduce operations on it directly, like any other query. Note that the actual functions passed to MongoDB are still written in Javascript. For example:

// (this should be statically cached in a separate function)
$mongo = new Mongo($server_name);      // connection
$mongodb = $mongo->selectDB($db_name); // MongoDB instance

// use the new XMongoCollection class. make it available with an __autoloader.
$collection = new XMongoCollection($mongodb, $collection_name);

$map = <<setScope(array('variable' => $variable));

// 2nd param becomes the temporary collection name, so tmp_mapreduce_example. 
// (This is a little messy and could be improved. Stated limitation of v1.8+ not supporting "inline" results is not entirely clear.)
// 3rd param is $collapse_value, see code
$result = $collection->mapReduceData($mr, 'example', FALSE);

MapReduce in Node.js

The MongoDB-Native driver for Node.js, now an official 10Gen-sponsored project, includes a collection.mapReduce() method. The syntax is like this:

 
var db = new mongodb.Db(dbName, new mongodb.Server(mongoHost, mongoPort, {}));
db.open(function(error, dbClient) {
  if (error) throw error;  
  dbClient.collection(collectionName, function(err, collection) {
    collection.mapReduce(map, reduce, { 
        out : { inline : 1 },
        query: { ... },     // limit the initial set (optional)
        finalize: finalize,  // function (optional)
        verbose: true        // include stats
      },
      function(error, results, stats) {   // stats provided by verbose
        // ...
      }
    });
  });
});

It’s mostly similar to the command-line syntax, except in the CLI, the results are returned from the mapReduce function, while in Node.js they are passed (asynchronously) to the callback.

MapReduce in Mongoose

Mongoose is a modeling layer on top of the MongoDB-native Node.js driver, and in the latest 2.x release does not have its own support for MapReduce. (It’s supposed to be coming in 3.x.) But the underlying collection is still available:

var db = mongoose.connect('mongodb://dbHost/dbName');
// (db.connection.db is the native MongoDB driver)

// build a model (`Book` is a schema object)
// model is called 'Book' but collection is 'books'
mongoose.model('Book', Book, 'books');

...

var Book = db.model('Book');
Book.collection.mapReduce(...);

(I actually think this is a case of Mongoose being better without its own abstraction on top of the existing driver, so I hope the new release doesn’t make it more complex.)

In sum

I initially found MapReduce very confusing, so hopefully this helps clarify rather than increase the confusion. Please let me know if I’ve misstated or mixed up anything above.