January 18, 2023
This is our third blog post about the parallel streaming evaluator we use at Channable, where we use Haskell with the Conduit library to produce both result values and parallel work units in a single stream. In the previous blog posts we explained the basics of our evaluator and how it can be used to implement a variety of actions. Read those first!
Whereas the first two blog posts were intended to be readable for an entry-level Haskell programmer, we'll now go a lot more in depth and we will assume more background knowledge about Haskell and general software technology.
We will discuss how we can execute work in parallel and look at the internals of Conduit to do this efficiently. For a good understanding it helps if you have a bit of hands-on experience with the Conduit library. We will also explain why
TMVar is faster than
MVar in our situation and how we decide how many threads a job should use.
In the first blog post we introduced the following way to run a conduit that produces
WOWork parallel work units:
runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r
This function is responsible for consuming the top-level conduit and it should to run the work units on multiple threads. We'll dive a bit deeper into how this can be implemented, but first we need to take a look at how the regular
ConduitT contains a
Pipe inside. The
runConduit function takes this
Pipe, does some stuff that's out of the scope of this blog post, and calls an internal evaluation function
runPipe on it:
runConduit :: Monad m => ConduitT () Void m r -> m r runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done
Pipe has 5 constructors that all represent a particular state that the conduit can be in. Most of the states also include what the next state will be, either directly or as a continuation that first requires an argument. You can look at the full definition of
Pipe, but only small parts are relevant to us due to the constraints that
runPipe puts on the parameters. It's easier to explain only the relevant bits within the context of
runPipe :: Monad m => Pipe Void () Void () m r -> m r runPipe = \case HaveOutput _ o -> absurd o NeedInput _ c -> runPipe (c ()) Done r -> return r PipeM mp -> mp >>= runPipe Leftover _ i -> absurd i
HaveOutput case usually means that an output has been produced, here
runConduit has restricted the output to be of type
Void so this state can not occur in
runPipe (in the absence of
undefined and co). For intermediate conduits we do have
Pipes with a
HaveOutput constructor but those generally get consumed by a sink like
sinkVector, which place the final result in a
The other cases are less important for our story, but for completeness:
NeedInputmeans that an input must be passed to the conduit before we can continue. The input type is always
()so we pass it a trivial
()value and get a new pipe that we can recurse with.
Donemeans the conduit is finished and we can return the result of type
PipeMmeans that some work in the
mmonad must be performed, and then we get a new pipe to recurse with.
Leftovers can't occur in this
runPipecall because the relevant parameter is
Void(they are already handled by
runConduitWithWork implementation we run the pipe in a very similar fashion, but this time the pipe gets stored in a shared
MVar and the different threads take turns in evaluating it. The
MVar acts as a locking mechanism so that only one thread can have the pipe at any point in time. Each thread keeps evaluating the pipe until they encounter some parallel work, at which point they put back the pipe, evaluate the work (which may take a while), and wait until they get the pipe again.
If any worker finishes because they encounter a
Done constructor, the other threads are automatically cancelled and we return the result (through
runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r runConduitWithWork numThreads (ConduitT pipe) = do pipeVar <- newEmptyMVar threads <- sequence $ replicate numThreads $ Async.async $ runWorker pipeVar putMVar pipeVar $ injectLeftovers $ pipe Done snd <$> Async.waitAnyCancel threads runWorker :: MVar (Pipe Void () (WorkOr Void) () IO r) -> IO r runWorker pipeVar = loop where -- Take the pipe variable, so that no one else can. loop = takeMVar pipeVar >>= withPipe withPipe = \case HaveOutput pipe (WOWork w) -> do -- Put back the (modified) pipe for someone else to use, because we have work to do! putMVar pipeVar pipe liftIO w loop -- All the below is the same as 'runPipe' and is done within the critical section. -- This includes evaluation of upstream and monadic effects in the conduit. HaveOutput _ (WOValue o) -> absurd o NeedInput _ c -> withPipe (c ()) Done r -> pure r PipeM mp -> mp >>= withPipe Leftover _ i -> absurd i
The big benefit of this implementation is the comparative simplicity, there is very little shared state (just one
MVar) and therefore very little overhead. Communication between threads only happens when a parallel work unit is encountered.
In terms of CPU time this evaluator is almost as good as a single-threaded version of
runConduitWithWork would be. Specifically when only a single thread is used, this implementation will do some extra
takeMVar calls when it encounters a
WOWork but besides that the hot loop doesn't involve any work that
runConduit doesn't do. If you do have multiple threads but the workload is sequential (so there are no
WOWorks), a single thread will run the entire conduit in exactly the same way as
runConduit while the other threads remain idle.
The Software Transactional Memory (STM) framework in GHC provides a bunch of functionality for concurrency and parallelism. STM is not always the right tool for the job, but in our case, replacing the
MVar with it's STM counterpart (
TMVar) gave a small but very measurable improvement!
Our theory is that the crucial difference lies in the fairness guarantees. Where an
MVar ensures that all waiting threads get their turn in order, a
TMVar only promises that some thread is woken and this allows the GHC runtime system to make more efficient choices.
For example if we have this sequence of events in the
pipeVar, does some parallel work and wants to take the
Now we reach a decision point! CPU 2 is still doing other stuff and now both worker threads are waiting for the
pipeVar. If we're using a
TMVar, CPU 1 will just continue running worker thread 1, taking the
pipeVar. With an
MVar we must ensure fairness and it will actively awaken worker thread 2. Worker thread 2 is then moved from CPU 2 to CPU 1.
In other words, the runtime system tries to make threads stick to one CPU but the fairness guarantees of an
MVar sometimes prevents that. By using
TMVars we're working with the work stealing scheduler rather than against it.
The GHC event log provides a lot of insight in these kinds of events. What we observed there seems to match our theory. This reddit discussion was quite helpful as well in figuring this out, in particular this comment by
This doesn't mean that
TMVars are always faster than
MVar, it really depends on your use case. We tried applying the same trick in a few other places, but none of them saw a similar improvement.
We went through a bunch of iterations before we finally came up with the shown pipe-passing implementation of
Initially we used a separate conduit:
runWork :: ConduitT (WorkOr a) a IO ()
runWork conduit took care of consuming all the parallel work units, which allowed us to run the entire thing in the regular
runConduit. Everything fits neatly in the Conduit framework, quite elegant!
runConduit will evaluate the whole conduit structure in a single thread. The obvious way that
runWork then evaluates parallel work is by putting it in some shared queue, where other threads can pick it up to evaluate it. This immediately gives some additional overhead because all work goes through the queue and has to be moved between different threads.
You also need to make some other tricky decisions, for instance:
You could probably have
runWork just block when the queue is full and get reasonable behaviour. Within our job evaluation system however, we want to strictly cap how many threads each job gets to use and this made everything tremendously complicated. To give you a rough idea of all the moving parts involved: We took the 'main' thread (the one that's running the conduit) and the 'extra' threads and tried to block/unblock them such that the total number of unblocked threads always matched the number of threads that were assigned to the job. When the queue was full, we would block the main thread (by doing a blocking
takeMVar in the evaluation of the
runWork conduit) and then unblock an extra worker thread. Conversely when the queue grew too small one of the extra worker threads would block and ensure that the main thread became unblocked.
The overhead of all of that was quite large, but for jobs that were a good fit for parallelization it was still an improvement. Once we switched to the current pipe-passing implementation, the performance difference was quite dramatic. Below we see a comparison between the load on actual production with
runWork (the thin green line) and pipe-passing (the fat green line):
Most parallel evaluation systems, including the
runWork conduit, involve a queue of tasks somewhere. Our pipe-passing implementation seems to get away without one, how does that work?
The conduit is the queue!
If you actually look at the work producers from the previous blog (
sinkItems and the different aggregations), they all produce the work on demand. For instance if we look closely at the
yieldAndSink :: [Item] -> ConduitT i (WorkOr o) IO [Item] yieldAndSink items = yieldParallel 100 items .| sinkItems
When a worker thread pulls on this conduit, the
yieldParallel will yield a single
sinkItems will promptly convert it to a
WOWork parallel work unit. This all happens within the worker thread, which then proceeds to evaluate the work unit and another worker thread gets a turn with the conduit.
The aggregations work in a similar fashion, producing work on demand so that no queue is necessary. Unlike the
sinkItems combination they sometimes can't yet produce any work because it has to wait for other work to complete.
One interesting case is if you have a component that produces both
WOWork and other things. For example you can construct a conduit that converts a bunch of parallel streams to a single sequential stream of items. This
sequentialize is just like
sinkItems .| Conduit.yieldMany, but could already start yielding items before it has received all the streams.
sequentialize :: ConduitT (WorkOr ParallelStream) (WorkOr Item) IO ()
This has to deliver
WOWork for the workers to evaluate, but once the results of the workers start coming in it has to choose between yielding items or yielding work. You want to provide enough work so that all the workers are kept busy, but not too much so that you're not buffering huge amounts of items. In our implementation, we limit the total amount of slots that it has for receiving work results. When all the slots are taken, it will first yield items from the first slot before yielding new work to fill a new slot.
If we check the literature, our pipe-passing implementation acts similar to continuation-stealing schedulers, while our old implementation was similar to the child-stealing variation.
Schmaus et al. in Nowa: A Wait-Free Continuation-Stealing Concurrency Platform note that continuation-stealing schedulers require fewer stack switches because "if the continuation is not stolen, which is the typical case, the worker can proceed without a stack switch". Figure 3 in that paper shows how that works. This is similar to what A. Robison already wrote in A Primer on Scheduling Fork-Join Parallelism with Work Stealing:
The previously mentioned irksome semantic difference between child-stealing and continuation-stealing is most evident when no thief is available. With continuation stealing, the order of execution becomes identical to as if the spawn and sync annotations were ignored.
Robinson also notes that child-stealing "requires space proportional to n to hold the unexecuted tasks", but with continuation-stealing "the number of extant tasks is no more than P, the actual number of threads executing the loop in parallel". That's indeed what we observed as well, with the task queue disappearing.
In the pipe-passing implementation of our evaluator the pipe is a shared resource. Only one thread at a time can use/modify the pipe. The time between taking and returning the pipe can be called a critical section in concurrent programming parlance. So in other words, only one thread can be in the critical section at any point in time.
The time that a thread spends in the
runWorker loop can then be roughly divided in three parts:
wait%: fraction of time spent waiting for the pipe (because some other thread has it)
crit%: fraction of time spent in the critical section, where it can use and modify the pipe
par%: fraction of time spent evaluating parallel work (without the pipe)
If we measure these numbers we can make a pretty good estimate of how many threads are actually useful. Intuitively, if a single thread spends 1% of time in the critical section you can probably throw lots of threads at the problem. If a single thread has 90%
crit time there is only 10% remaining time where other threads can use the pipe, so any additional threads will just spend most of their time waiting while increasing overhead. In the setup that we use at Channable we limit the total number of threads that all running jobs can use, so a thread that spends most of its time waiting will directly reduce our job throughput.
par time percentages tend to change a lot during the run time of a job. When the job contains a few aggregations they tend to separate it into different sections, each of which allows for a different amount of parallelization. It's therefore useful to continuously measure them and adjust the number of used threads accordingly.
A neat trick to get a good running estimate for these is by using an exponential moving average. This can be used to turn a simple on/off signal into an average percentage, and can be implemented efficiently as a filter. Unlike a simple moving average we don't have to remember any previous values, so no ring buffer or anything like that is necessary.
To actually turn these numbers into an estimation of how effective our threads can be used, we used the model from the paper "Modeling Critical Sections in Amdahl’s Law and its Implications for Multicore Design" by Stijn Eyerman and Lieven Eeckhout. In particular we used formula 11 and by fixing some of the variables we ended up with a neat function:
modeledSpeedup :: Double -- Fraction of time spent in critical section (for a single thread) -> Int -- Number of threads -> Double -- Estimated speed up compared to using a single thread
Then if we measure a
crit% of 15% we can estimate the speedup for any number of threads, and the difference is then how effective each additional thread is:
| N | Speedup | Diff | |---|---------|-------| | 1 | 100% | +100% | 1st thread is 100% effective | 2 | 196% | +96% | 2nd thread is 96% effective | 3 | 287% | +91% | | 4 | 375% | +88% | | 5 | 426% | +51% | 5th thread is only 51% effective! | 6 | 453% | +27% | | 7 | 475% | +22% | | 8 | 492% | +18% |
This estimation isn't perfect because we're only accounting for the things we're measuring, and not everything else that's happening on the server (even within the server process). So far it's been working surprisingly well though.
At the moment we use a simple cap of 70% on thread effectiveness. So according to the table with a
crit% of 15% the job will try to use 4 threads. The threads are currently dealt out on a first-come-first-serve basis, so we need the cap to prevent a single job from wasting all our cpu resources in an inefficient manner while starving other jobs.
A different approach would be a central system that decides which job can use the core most efficiently, based upon factors like priority, current workload, existing cores, etc. Jobs would just use as many threads as possible (even at low effectiveness) but if another job comes along that can make better use of those threads we would move them over. This is something we still would like to implement in the future.
Below we see an example of the lifetime of a single job that takes about 60 seconds. The blue line is how many threads it's using, and the red line is the total CPU time that the whole server process uses while running just this job. The live measurements of
wait% and the speedup model allow the job to quickly jump to an appropriate number of threads while the job is going through different phases that allow different amounts of parallelism.
Our earlier blog posts showed that we can pass along computed values and parallel work units in the same stream, by using a
ConduitT with the
WorkOr type. We were able to implement a variety of actions in this way.
In this post, we showed that we can not only express all these different behaviours, but that we can also run them quite efficiently with our pipe-passing implementation.
We're very interested to hear if you know about libraries that do something similar to this, either in Haskell or in another language. Let us know on reddit or hackernews.
In the fourth and final blog post in this series, we will explain how we implemented conditionals. More precisely, we show how we can efficiently send only values that match a condition through an aggregation. As a preliminary to conditionals we need to ensure that evaluation is done in a non-blocking fashion, by allowing the conduit to signal the evaluator that no work is available at the moment.
1: Actually we use a
TMVar instead of
MVar, as is discussed in the next section, but using
MVar makes the example code slightly easier to read. ↩
2: In reality, threads run on capabilities. Those are not precisely the same as CPU's, but for our purposes we can pretend that they are. ↩