The companion code to this post can be found here
Reddit user nefreat recommends this post for more info on transducers and comp ordering with transducers.
Initially my goal for this post was to write about error handling in pipelines. I wrote some code and experimented with a bunch of scenarios and concluded that such a post might be boring and that there wouldn’t be much meat there. This is because pipelines are, in my opinion, at the time of this writing, the single most robust abstraction in the entire core.async quiver.
I am not even joking. Let’s take a look at what pipelines manage to pack in to a few small functions:
- Error Handling
- Easy mode parallelism (just change a number.)
- Ordering guarantees.
Let’s talk about these.
In order to understand what pipelines give us for composition, first we have to
talk a little bit about transducers. I’m not going to attempt to write down
or even try to write any transducers from scratch right now. I highly encourage
you to take a look at the clojure.core code for
distinct. To start building pipelines, though,
all we really need to care about is how to use the mapping and filtering transducers.
The tl;dr for why transducers are worth your time is this: Transducers don’t care where data comes from, they only care about how to transform it. It is for this reason that transducers can operate over collections, operate lazily and eagerly, and be packed up with a bunch of data and passed around for later evaluation (I’ve never had to write an eduction in my working life but we have an informal competition at work to find the use case and I’ve no doubt there is one).
What does this mean for us? Transducers are the natural way to operate on data coming through channels.
Enough talk, here are some examples. The project that I started working on for this post is a system for tracking wikipedia edits. There were two possible ways to do this, one is through a socket.io websocket (I had very little luck finding a clojure client that would readily consume it) and, somewhat amusingly, an IRC channel.
I mean it sounds funny but it’s actually a great solution to the problem. A channel can operate as a pub-sub system that bots can readily consume and all of the events are human-readable as well, which made it pretty fun to figure out how to consume and groom these events for our purposes.
Let’s assume we have an IRC producer (I’ll publish my code for it, I make no guarantees about its stability as it’s build on top of an old Raynes library). As clojurists, our first instinct is to turn this into structured data.
Here’s what an event from the producer looks like:
First things first, let’s look at this map and see what we care about.
Command is useful because we may get a number of message types from IRC, and we’re interested in PRIVMSG.
Nick is useful because only one user actually publishes these edit events. These channels aren’t very noisy but if they were we would end up getting bad data.
Target isn’t useful to us now, but if we wanted to watch all of the edit channels for all of wikipedia, this would allow us to split that data when the time came.
Text contains pretty much all of the metadata that we might want to do ~big data~ on.
So we want to map the above event to something that looks like this:
That’s a little less noisy, isn’t it?
Baby’s first transducer.
If we wanted to write a function to do this, it would be pretty small, wouldn’t it?
It’s already in
core, in fact.
Now if we had a list of this data that we wanted to transform eagerly, we’d
this function over that list. If we don’t provide a collection to map, though, we get
a mapping transducer. All that means is that we can plug this into a number of functions
to perform this transformation.
When I was testing it in the REPL, I did just that, building my transducer and using
to pass my collection of input into a collection of output.
Insight One: Transducers are testable as all get-out.
Because transducers decouple our actual transformations from the data we pass through them, we can easily supply mock data to a transducer and assert on what comes out the other side. We can run tests on large samples of captured data, or if we’re feeling sassy we can write generators to generate all kinds of valid or invalid data.
This is nice but we still have work to do.
Next up, let’s filter out everything that doesn’t come from the producer of these events. Their irc nick is “rc-pmtpa”.
This is another one-liner, all we need to do is pull the
:nick key out of the map and
filter on it.
How, pray tell, are we going to combine these two transducers?
Insight two: Transducers are composable
Let’s just alter that previous tx definition a bit.
That’s it, we can compose these two functions and get a function that will:
- filter out the nick we don’t want
- select only the keys we want.
comp returns a function which applies the right-most of its arguments
first, and each other argument to the left-most. This would lead us to believe
that composed transducers work the same way, right?
Unfortunately, no. Here’s a quick REPL session:
I’d like to explain this but if I start tracing transducer function signatures right now I’ll never get anything done. If anybody would like to get at me with a nice explanation of this, I’ll be happy to add it and credit you.
TL;DR, composition of transducers works effectively from left to right. This is nice because that’s how we tend to think of pipelines, but just be aware that there’s a tiny inconsistency when we use comp.
Transducers all night, apply from left to right
Functions mos def, apply from right to left
You’ve got the idea though, we can stick as many of these together as we’d like. They’re small, testable, composable, and way more efficient than the alternative, which is to write either one big go-loop that does more than we’d like it to, or write a bunch of small go-loops that don’t run in parallel and force us to hold intermediate copies of all of our data.
Let’s fast forward a bit.
Thanks for sticking with me so far, I know this is long but there’s a lot to talk about. Let’s skip to our completed transducer.
Now we filter out keys, filter the nickname, strip mIRC colours out (those ugly caret C’s spattered all over our text), parse the text into some metadata and finally add that metadata to our map.
Oh, also we log the result out to console.
Key insight #3: Transducers can have side-effects.
We want to be careful with this, because it’s very easy to jam up a pipeline by depending on external calls, connections, reads/writes to disk, etc. It also requires us to put more error handling in our pipeline.
It is also potentially incredibly powerful. Here’s an idea I’m playing with. I haven’t used this in production, use at your own risk, but I think it’s interesting. Also it might not compile, consider it a sketch.
This allows us optional logging or metrics middleware that we can either accumulate in our data as we pass it through or report to some external source. With stateful transducers there are some even fancier possible measurements, like throughput or frequency counts.
Parallelism, done right, is always just an optimization. Only some kinds of problems are parallel but if we can
phrase a problem as a parallel problem, a good abstraction should be able to do the rest. Luckily, we already have
some of these abstractions.
filter are both parallelizable because they guarantee that each
piece of data will be independent from other pieces of data. As a result, anything built on map and filter
transducers should be something we can parallelize easily.
We can see this in
pmap, which works exactly like a map but with multiple worker threads (I don’t really like
pmaps because I’ve never taken the time to learn the gotchas and tradeoffs).
Pipelines handle this brilliantly, taking an argument for the number of workers and doing the rest.
Let’s looks at a pipeline definition.
So there’s a lot here, but some really nice properties. If we want to run the same code we’re running on a c3.8extralarge up in AWS, we need but change a number to take advantage of 32 cores.
We can choose whether to close channels or not. If we have multiple pipelines feeding the same channel (I’ve never done this but it’s certainly feasible), we can do that without a single pipeline taking down the rest of the system.
The exception handler lets us do all sorts of interesting error handling. If we want to
we can do that. If we want to close a consumer channel or send a restart signal or any manner of things,
we can do that. We can just log the error and the consumer of the pipeline won’t ever see the data we’ve
Abort, Retry, Ignore, Escalate
As one of my areas of interests is error handling in this sort of system, there’s a small thread to pick up. Generally, we can respond to error conditions one of four ways. We can bail on the entire computation, try again, ignore the erroneous input, or decide that this is above our paygrade and send it to something with more context to handle.
The last option is something I’m still wrestling with in
core.async. If our pipeline only contains pure
functions, retry isn’t really an option without altering the input data (something I don’t generally think we should
do). This gives us only two real options: We can kill the pipeline or we can ignore the input.
Kill in Dev, Log in Production
While working on this IRC pipeline, I started it up and left it overnight, happy path only, to see if it would crash. If this system had any need for stronger guarantees it would be a good idea to try to subject it to connection problems and bad data as well, but failing early and often is likely the best way to find instances of errors.
Once our service is out in the world, there is no tool better to determine what went wrong than logs. Lots of logs. Any event that happens in a system warrants at least a small log, and log levels are our friends. If an info log is too chatty in production (as I’ve recently discovered in a bug in one of our services), it’s a good bet that the service may be doing something much more often than is necessary.
One of the nicest properties of pipelines is that they preserve event ordering. This allows us to safely implement a number of concurrent systems that we can easily reason about. This leads us to one of the weaknesses of a pipeline, however.
Pipelines work best when their work is roughly uniform in size
Luckily for our wikipedia edits example, this holds true. Consider a reporting pipeline that pulls a wide variety of events. Some of these are small maps, some of these are large data blobs.
Let’s say that a small map M takes roughly 3ms to process and a large data blob B takes roughly 300ms to process.
Due to ordering constraints, a batch of events will be constrained to the slowest event.
Let’s do a little bit of greasy arithmetic:
Suppose that B represents 10% of our input events and M represents 90% of our input events.
We have a number of workers, W, that process pipeline events.
The maximum throughput of our pipeline will be, roughly,
Exercises left to the reader.
This post is much longer than I meant it to be, but I hope it’s helpful. Here are some things to think about:
pipeline-async differ? In what
situations might we use each?
How would we generalize that throughput calculation for any number of events of any size?
How would we check our throughput model to see if it’s true?
How would we write a
pool function that works exactly as pipeline does but relaxes ordering?