Pure Danger Tech



25 Jan 2011

We’ve been using Lamina channels lately and while testing stuff (and maybe in our impl) it would be helpful at times to have some way to encapsulate the future result of the results put on a channel. The java.util.concurrent.Future interface is a great match for this and Clojure has a bunch of functions to work with it already (@/deref, future-done?, etc).

I took a stab at implementing a channel future as below. This implementation creates an anonymous object that implements Future and clojure.lang.IDeref (for @ support) using reify. It closes over two things: 1) the result (a seq of the remaining items in the channel) at the time channel is sealed and 2) a CountDownLatch used to notify and threads waiting for the result.

The crux of this is adding a listener to the channel for the sealed event using on-sealed. This listener waits until the channel is sealed, then extracts the seq from the channel and sets it as the result. Then it counts down on the latch, waking up all awaiting listeners.

It took me quite a while to internalize the sealed/closed difference. The current model of a channel’s state is:

open -> CLOSE -> sealed -> EMPTY -> closed

The lower case words are states and the UPPER case words are events that move between them. The CLOSE event happens when a producer calls (close) on the channel. At that point, the channel is sealed and no more items can be added to channel but the existing items have not (necessarily) yet been removed from the channel by the consumer. When the consumer reads all data from the channel, it then becomes closed. I think the confusing part of this is that the (close) action takes you to the sealed state not the closed state (and fires on-sealed callbacks, not necessarily on-closed callbacks). I think the states and actions make sense, but I would prefer names like:

open -> CLOSE -> closed -> EMPTY -> drained

so that the close action takes you to a closed state and fire on-closed callbacks while emptying the closed channel would go to a drained state and fire on-drained callbacks. Just a thought.

Here’s the code:

(defn channel-future
  "Takes a Channel and returns a Future that can be used to obtain 
   a seq of the results on the channel when the channel is sealed.  
   Cancellation is not supported."
  (let [result (atom nil)
        latch (CountDownLatch. 1)]
    (ch/on-sealed channel (fn []
                            (reset! result (ch/channel-seq channel))
                            (.countDown latch)))
    (reify Future
           (isDone [this]
                   (boolean @result))
           (get [this]
                (.await latch)
           (get [this timeout unit]
                (.await latch timeout unit)
           (isCancelled [this]
           (cancel [this may-interrupt]
                   (throw (UnsupportedOperationException. "Channel Futures don't support cancellation.")))
           (deref [this] (.get this)))))

Callers can use the built-in Clojure api for Futures as follows:

(deftest test-channel-future
  (let [ c (ch/channel)
           f (channel-future c)
           r (ref false)]
      (is (future? f))
      (is (not (future-cancelled? f)))
      (is (not (future-done? f)))

      (ch/enqueue c 1 2 3)
      (is (not (future-done? f)))

      (ch/close c)
      (is (future-done? f))
      (is (= [1 2 3] @f))))

UPDATE: Zach provided a much simpler solution to this need. Use reduce* to collect all results from the channel and return when sealed, then use wait-for-message to block on the result, and make something you can carry around using future:

(defn channel-results [ch] 
  (reduce* conj [] ch)) 

(future (wait-for-message (channel-results ch)))