Skip to end of metadata
Go to start of metadata

problem statement

Many real-world problems can be reduced to handling queues of messages. The messages are often ordered; it's not enough to define a single handler for all messages, since one message can dictate how future messages are handled. These messages also often represent future side effects; once they are modified, filtered, and accumulated, they will generally be sent over the network, printed to the screen, or written to disk.

It's this last fact that points to a crucial difference between these message queues and Clojure sequences: adding a message to the queue is not always transactionally safe. However, this doesn't mean that messages can never be transactionally enqueued. If enqueuing the message doesn't directly result in a side effect, transactions are perfectly fine.

In other words, transactions can safely encompass the modification, filtering, and accumulation of messages, as long as there's a clear separation between that process and using the end result to create side effects. A good solution should maximize the extent to which this is possible, and minimize the potential for mistakes.

Something which satisfied all of the above would be a good candidate for inclusion in contrib.

some possible approaches

pure functions (a la Ring)


  • Leverages existing operators for composing and applying functions
  • Easy to understand and reason about, hides away side effects entirely


  • Assumes simple call and response model for communication
  • Works best when messages are unordered; one message cannot effect the handling of another without the introduction of state
  • Works best when response is immediate

blocking lazy-seqs


  • Leverages existing operators for sequences
  • Widely understood abstraction


  • Immutability doesn't make it safe: neither LinkedBlockingQueues or queues of promises are transactionally safe to enqueue into (if deliver is used in a transaction that retries, it will throw an exception the second time around)
  • Must manually allocate one thread per consumer
  • No concept of timeout, consumer threads can starve forever
  • Somewhat leaky abstraction; (next seq) takes a predictable amount of time pretty much everywhere else, and holding onto the head of the seq is a non-obvious trap to avoid

pub-sub events


  • node.js does it, so can we
  • Simplest asynchronous mechanism there is


  • If no one's listening, the data disappears
  • node.js imposes order on events by using a single thread, to do the same Clojure must either:
    • Make events per-thread (a la Erlang actors), which doesn't play nicely with Clojure's existing concurrency primitives
    • Impose an ordering using state, which is arguably something that should be abstracted away

the proposed approach

Channels are described in detail here, and a method of using them to compose asynchronous workflows is described here.

Some notable qualities about channels, to contrast against the other approaches:

  • When no callbacks are registered, messages queue up
  • Callbacks can either consume all messages (receive-all) or just some messages (receive, receive-while)

The combination of these two features means that we can choose between a push or pull model, depending on the situation. It also saves us from the transaction/side-effect issue, since can divert messages into a receiver-less channel while in a transaction, and then use those messages to achieve side effects outside the transaction.

  1. Nov 01, 2010

    No one should try to do any design work for async in Clojure without thoroughly understanding what Erik Meijer has done with .Net Reactive Extensions.

    1. Nov 02, 2010

      When I first looked at RX, it struck me as pretty similar to what I'm proposing: creating an asynchronous analogue to sequences, allowing for easy composition and leveraging people's existing knowledge of sequence operators.  Looking at it more carefully, this still seems to hold true.  Am I wrong on this count?

      I realize this proposal is unusual in being so fully implemented, but I'd be happy to throw it all away if it results in a standard interface everyone can get behind.  If you prefer, feel free to simply treat this as a request to have something like RX in contrib.

  2. Nov 01, 2010

    I've looked over the spec, but I'll give it a more thorough read.  Is there anything in particular you'd call my attention to, or just the design as a whole?

    1. Nov 05, 2010

      The most comprehensive document I could find was the hands on lab.

  3. Nov 06, 2010

    I think one important distinction is that RX observables are not necessarily queues.

    1. Nov 06, 2010

      That's true.  My contention is that the queuing behavior is necessary to make events play well with transactions, but there may be an alternative approach I'm not seeing.

      1. Nov 06, 2010

        Maybe if events were integrated with Clojure's STM, events generated within a transaction could be held until the transaction commits, similar to how Agent sends behave now.

        1. Nov 06, 2010

          Just for the record, a parallel discussion is going on in the Ring mailing list.

          You're right, that would fix the problem with enqueueing inside a transaction. However, I also think that the queueing behavior can make consuming messages a lot simpler.

          Consider a reduce operation over a stream of events. If events simply flow through the channel the handler has to be stateful, mutating the value as events arrive. But if we can consume messages one at a time, we can simply pass the intermediate value into the next callback. This also narrows the scope of the transaction to just handling the queue of messages, rather than over the entire reduce operation.

          In my experience with writing Aleph, most of the state in event-driven programming results from wanting the contents of a message to affect how future messages are handled. This is possible with a push-only approach, but with the pull model the state is completely abstracted away.  Giving the developer the choice to consume a stream via either push or pull (and allowing him to switch mid-stream) is empowering.

          1. Nov 07, 2010

            It really seems like what we want is something like CPS, channels you publish to being similar to a continuation and channels you receive from being like a function that takes a continuation.

            sending to a channel would be as simple as calling the channel on the value to send. for receiving from a channel you would do something like: (channel call-back) which would return a channel that would contain results from the callback, if any. the call-back would be passed three values, the original channel so it can schedule(if thats the right) its self again to be value with the next value from that channel, the results channel which it can use to emit results if it chooses (for implementing map/filter/etc), and the value from the original channel.

            I am just starting to play with F# but this seems somewhat analogous to the syntax they have introduced for async workflows. inside an async workflow I believe you can call return and return! one of which, uh, returns back into the workflow(I think) and the other yields a value.

            It would actually be possible to just pass a single value to the call-back and have access to the original channel and result channel via dynamic binding, which I think would make the user experience more like added syntax, but I am not sure which of the two I prefer.

            1. Nov 07, 2010

              If you haven't already, take a look at the pipelines wiki, which seems similar to what you're describing, though I'm not sure I completely understand you. It would be instructive for me if you took a look and described any fundamental differences between the two approaches.

  4. Dec 06, 2010

    I've done some experimental work on an RX-like library for Clojure