Thursday, October 08, 2009

Parallelizing MapReduce

I was curious how hard it would be to take the MapReduce example I posted earlier and have it operate in parallel. Turns out, thanks to Google, not very hard at all.

First, I needed some sort of for-each function that operated in parallel. I could write my own, but instead, was fortunate enough to find make-pool-for-each, which does exactly what I was looking for.

I then used an existing hashtable implementation, and was basically there. Here's what my code looks like:

#lang scheme
;;
;; A trivial implementation of MapReduce
;; Inspired by: http://programmingpraxis.com/2009/10/06/mapreduce/
;; Parallelized by: http://groups.google.com/group/plt-scheme/browse_thread/thread/8fe42711304e831f?pli=1
;;
(require "parallel.ss")

(define mapper/c (-> any/c any/c))
(define reducer/c (-> any/c any/c any/c any/c))

(provide/contract
 [map-reduce-worker-pool-size (parameter/c number?)]
 [map-reduce (-> mapper/c reducer/c list? hash?)])

(define map-reduce-worker-pool-size (make-parameter 100))

(define (map-reduce mapper reducer items)
  (let ([p-for-each (make-pool-for-each (map-reduce-worker-pool-size))]
        [store (make-hash)])
    (p-for-each (lambda (item)
                  (let-values ([(k v) (mapper item)])
                    (hash-update! store k (lambda (found)
                                            (reducer k found v))
                                  v)))
                items)
    store))
    

And here's an example to show it works:

(define (stall)
  (sleep (random 5)))

;;
;; XXX - I really need to add in some timing code that shows the time
;; taken for this example is (max stall1 stall2 ...) and not
;; (+ stall1 stall2 ...)
;;
(define (example)
  (map-reduce (lambda (x) (stall) (values x 1))
              (lambda (k v1 v2) (+ v1 v2))
              (string->list "bananana")))
              
(example)

I think this is also a good example of the power of parameters. For the above code I needed a value for the pool size. I didn't want to have it explicitly passed in, as I wanted that hidden from the average user. I didn't want to use a global variable, as that's a programming nightmare. And finally, I didn't want to leave it non-configurable. By using a parameter, the interested programmer can set the pool size at the start of their application or for an individual call, all without worrying about global-variable headaches.

I wonder what it would take to extend this to operate over the network. Hmm....

No comments:

Post a Comment