Update: April 7
Reddit user halgari (who wrote the core.async go macros and has an excellent series of videos on how they work,
which can be found here)
pointed out that using a
>!! in a function that would be
placed in a go-loop can lead to some nasty bugs and block the thread pool. I’ve
updated my code to reflect this.
@priyatam on the clojurians slack pointed out some issues running the code, I’ve resolved these as well.
core.async over time, we start to see some idioms and patterns
emerge. We’ll start with probably the simplest one, the
producer. If anyone
has a better name for this pattern I’d be happy to hear it, but for now
should be okay.
A producer is applied when we need to:
- Generate a stream of values or fetch them from an outside source.
- Place them on a channel.
That’s it! They usually function as some kind of i/o for an application, here are some examples:
- Consuming values from a message queue, database, or log file.
- Handling key, mouse, and touch events from the browser.
They’re simple and powerful, let’s take one apart:
Suppose we have a web store of some sort, and we want to track clicks. These clicks are published to an append-only log, like kafka.
Our kafka client is pretty simple: It gives us a message based on the
we provide to it.
(take! connection 0) will give us the first message, and so on.
This append-only log doesn’t track a whole lot of state, so it’s up to our producer to track the offset and increment it after we’ve consumed a message.
That’s it! The producer will keep pulling values off of our log and putting them
It respects backpressure, so it will only read values so long as the rest of the system downstream can process them, and otherwise should perform nicely.
But we talk to our log over the network. And networks are, well…
Our fake client looks like this:
I’m going to crib liberally from Joe Armstrong’s wonderful dissertation
Making Reliable Systems in the Presence of Software Errors,
because I believe that
core.async shares some of the same goals as
Erlang does. We’ll discuss this more later, but there are three important features
that the two share:
- Isolation of state.
- Message passing.
- Isolation of errors.
1 and 2 are very closely connected. Keeping state local to a go block or an actor means that we only need to worry about one piece of code modifying our data.
We can share state in core.async (atoms, records with mutable fields), but I would argue that it’s generally a bad idea and I don’t think I’ve ever had cause to do it.
Message passing is just a consequence of this. If we can’t share state we need for go blocks to be able to interact somehow. Values over channels function as messages because we assume nothing about the receiving process.
Error isolation is interesting. For a long time I thought the way
silently swallowed exceptions was a flaw or a necessary evil of its design. Reading
Armstrong’s thesis turned me around a bit.
core.async system to continue to function in the presence of errors, each piece of it should be able to fail independently without affecting the others.
Unfortunately, this producer doesn’t satisfy that. It will chug along happily until one of our session exceptions is thrown and then it will stop. It won’t log anything, the channel will just stop producing values. This will result in anything downstream of our producer halting as well, since there are no more values to take.
We can do better
Let’s pull everything that would operate in the go-loop into its own function:
And modify our go-loop to call the function instead.
Now, our go-loop has virtually no logic in it aside from message dispatch. It also returns a value if an error occurs. This will allow us to introduce our next pattern:
A very simple supervisor
Suppose we caught this problem in one of our production systems, and while we want to solve this problem better, in the meantime we’re going to handle the connection error. This will probably get us in trouble later but luckily our workplace is a work of fiction so I can make a point.
This will keep our producer going and restart it in the event of an error. And, dear reader, there will be errors.
But there are some immediate benefits as well! The first is that we’ve achieved isolation for our producer. Now if it fails we can apply whatever recovery logic we like and continue producing to the same channel without making any changes to the rest of our pipeline. A small step, to be sure, but our system is the better for it.
Next time we’ll see if we can’t work all of these great ad-hoc patterns into something a little more reusable.
The code for all of these examples can be found here