Pure Danger Tech


navigation
home

Playing with fork/join from Clojure

04 Jan 2011

I was playing around with fork/join (coming soon to JDK 7) recently and wrote a little adapter to use it more easily from Clojure. Fork/join is at its heart an Executor, just like the existing executor support in java.util.concurrent. The Executor interface decouples task submission from task execution. In the normal java.util.concurrent executors, there is a single submission queue and all workers in a thread pool pull from that execution queue.

The problem with this approach is that as you increase the number of threads, the queue becomes a source of contention (a bottleneck) as everyone synchronizes when pulling work from the queue. Doug Lea has been working on the fork/join framework for a decade now, building a new kind of executor pool. (See Cilk++ for another incarnation of this idea.) In fork/join, every thread has its own queue, which is really a dequeue (a double-ended queue). Each thread pulls only from the head of its own queue so there is no contention there.

The problem then becomes how to keep all the threads busy when they’re not playing from a single point of control. This is done by allowing threads to steal work from the *back* of another thread’s queue (again, avoiding contention at the head of the queue). This allows threads to “auto-balance” the work load. It’s also important to understand the task dependencies so you can execute tasks in the proper order, and that’s why fork/join lends itself so naturally to divide-and-conquer style algorithms.

I’ve written in the past about some of the exploratory work done by Rich Hickey to integrate fork/join processing into parallel operations in Clojure, which looks like very promising future work. In this post, I’m not doing anything nearly so deep; instead I’m just trying to allow you to create fork/join tasks and drop them in the queue easily from Clojure.

One annoyance with fork/join is that the ForkJoinPool requires you to submit instances of ForkJoinTask if you want to be able to fork and join. Because ForkJoinTask is a class, that implies that we need to create subclasses (most often subclasses of RecursiveAction or RecursiveTask actually) to play in the framework. That’s feasible with proxy in Clojure but kind of annoying.

Instead, I created a Java shim that knows about IFn:

package revelytix.federator.process;

import jsr166y.RecursiveTask;
import clojure.lang.IFn;

public class IFnTask extends RecursiveTask<Object> {
    private final IFn fn;

    public IFnTask(IFn fn) {
        this.fn = fn;
    }

    public Object compute() {
        try {
            return fn.invoke(this);
        } catch(RuntimeException e) {
            throw e;
        } catch(Exception e) {
            throw new RuntimeException(e);
        }
    }
}

Here we wrap an arbitrary Clojure IFn into a RecursiveTask that returns a generic Object. When we invoke the task, we pass it this to allow us to make calls like fork() and join() against the current task state.

On the Clojure side, I made some helper functions for dealing with common calls to a pool:

(ns revelytix.federator.process.fj
  (:import [revelytix.federator.process IFnTask]
           [jsr166y ForkJoinPool]))

;;;; -------- pool helpers ---------

(defn new-pool
  ([] (ForkJoinPool.))
  ([p] (ForkJoinPool. p)))

(defn shutdown-pool [pool]
  (.shutdown pool))

(defn submit-task
  "Invoke and return a Future."
  [pool task]
  (.submit pool task))

(defn invoke-task
  "Invoke and wait for a response."
  [pool task]
  (.invoke pool task))

(defn execute-task
  "Invoke task asynchronously."
  [pool task]
  (.execute pool task))

And some more for common calls to create tasks and fork/join/compute inside tasks:

(defn new-task [f]
  (IFnTask. f))

(defn fork-task [task]
  (.fork task))

(defn join-task [task]
  (.join task))

(defn compute-task [task]
  (.compute task))

As you can see, nothing complicated in any of this, just wrapping a lot of Java stuff. The only mildly interesting thing here is probably new-task which wraps a function into the IFnTask.

And finally, here is the canonical Fibonacci example (yes I am aware that this is a stupid way to implement Fibonacci) rendered in Clojure form for fork/join consumption:

(declare fib-task)

(defn new-fib-task [n]
  (new-task (partial fib-task n)))

(defn fib-task [n task]
  (if (<= n 1)
    n
    (let [f1 (fork-task (new-fib-task (- n 1)))]
      (+ (compute-task (new-fib-task (- n 2)))
         (join-task f1)))))

(defn fib [pool x]
  (invoke-task pool (new-fib-task x)))

Here, new-fib-task is a helper function to partial a fib-task for a value n. fib-task is the actual fork/join task in Clojure. The key here is that f1 is the (n-1) value and when we call fork-task on it, that branch of the computation is then (potentially) executing in parallel. When we join-task on it, we will wait for it to return. For the (n-2) value we also construct a new task, but instead call it’s compute function directly effectively doing that child task inline. I wrote it this way just because I ported the example given in the RecursiveTask docs.

The fib function requires a fork/join pool to run and invokes the proper task in the pool. We can then create a pool and grab the first 20 Fibonacci numbers like this:

(def pool (new-pool))
(map #(fib pool %) (range 1 20))

Hope that’s useful to somebody down the line….