Pure Danger Tech


navigation
home

Lamina channels and async tasks

19 Jan 2011

We’ve been playing with several different concurrency structures lately for the code we’re working on and I blogged last week about some of the ideas. Shortly after that, Zach Tellman dropped me a line and after some discussion pushed some new ideas into Lamina in response.

We were already using Lamina channels in parts of our app so we were pretty interested in the new code. Lamina channels are a nice queue representation (a wrapper around Clojure’s persistent queue implementation) that also includes the ability to close channels and receive messages from the queue via callbacks.

The newest implementations let you easily take a function that works on values and turn it into a function that puts its results on a channel (this is the new async function. I think this function needs a different name though as the resulting function is not in any way asynchronous. Really the purpose is to adapt a scalar function into a pipeline function so a better name might be pipeline-fn or channel-fn or something like that.

More useful is the new future* function. Lamina already has this great concept of pipelines that are chained together in stages, connected by result channels. You can use future* to let parts of that pipeline run asynchronously across threads. Each future*’ed part of the pipeline will be notified by callback when it’s channel produces a result. This is pretty cool – it lets you spread a pipeline of computations across threads where each computation is notified when the data it needs exists. This has many shades of dataflow and even actor models.

However, after playing with this for a while we could see that it wasn’t helping us to solve our problem for several reasons. The key issue for us was that future* is implemented using future and thus inherently blocks a thread in the agent’s IO pool while waiting for the response. Even more importantly, while we want the “do work when your inputs exist” aspect of data flow, we don’t necessarily want to do that execution immediately. Rather, we want to allow that work to be scheduled for execution over a fixed pool of threads that never block. Note that there is really no reason future* (or a variant) could not be made to do exactly that – I hope we get back to that level of abstraction once we explore other parts of our problem.

We started down a path with pipelines and result channels but found that pipelines were not buying us much over basic channels (really the error stream of a result channel was the only big benefit). For the sake of prototyping and simplicity, we’ve dropped back to using raw channels we work with directly. I think it’s likely we’ll come back to error channels in the future.

Another aspect of channels that was problematic in this respect is that they are designed to primarily process messages via asynchronous callback. The big problem with that for us is that those callbacks are executed in the thread adding a message to the queue and that’s difficult for several reasons. Instead we have switched to implementing tasks that run in a fixed thread and use dequeue to extract a message from the channel.

We also had some difficulties with pipelines as they seem designed to start primarily from a single input channel (with the exception of some features like read-channel and read-merge). In our system we have a tree of processing nodes where data comes in through the leaves and flows up to the root, which means we always have multiple pipeline inputs.

I’d love it if we end up with some body of code that can be contributed back to Lamina or another project that depends on it. While the existing features in Lamina aren’t a perfect match for our problem, I think the high-level abstractions are quite good. I’m hopeful that once we get a prototype fully running that we can look at ways to reuse more of the existing feature set. I think if we could build some clean support for channel-based tasks that use channel dequeue rather than callbacks and go through a scheduler protocol where we could plug in our processing pool (currently fork/join), that there will be something generic and reusable.

I originally wrote this as an email to Zach but I figured others might find it interesting or have ideas.