Channable

Tech

Parallel streaming in Haskell: Part 3 - A parallel work consumer

January 18, 2023

This is our third blog post about the parallel streaming evaluator we use at Channable, where we use Haskell with the Conduit library to produce both result values and parallel work units in a single stream. In the previous blog posts we explained the basics of our evaluator and how it can be used to implement a variety of actions. Read those first!

Whereas the first two blog posts were intended to be readable for an entry-level Haskell programmer, we'll now go a lot more in depth and we will assume more background knowledge about Haskell and general software technology.

We will discuss how we can execute work in parallel and look at the internals of Conduit to do this efficiently. For a good understanding it helps if you have a bit of hands-on experience with the Conduit library. We will also explain why TMVar is faster than MVar in our situation and how we decide how many threads a job should use.

A parallel work consumer

In the first blog post we introduced the following way to run a conduit that produces WOWork parallel work units:

runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r

This function is responsible for consuming the top-level conduit and it should to run the work units on multiple threads. We'll dive a bit deeper into how this can be implemented, but first we need to take a look at how the regular runConduit works.

Every ConduitT contains a Pipe inside. The runConduit function takes this Pipe, does some stuff that's out of the scope of this blog post, and calls an internal evaluation function runPipe on it:

runConduit :: Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT p) = runPipe $ injectLeftovers $ p Done

The inner Pipe has 5 constructors that all represent a particular state that the conduit can be in. Most of the states also include what the next state will be, either directly or as a continuation that first requires an argument. You can look at the full definition of Pipe, but only small parts are relevant to us due to the constraints that runPipe puts on the parameters. It's easier to explain only the relevant bits within the context of runPipe:

runPipe :: Monad m => Pipe Void () Void () m r -> m r
runPipe = \case
  HaveOutput _ o -> absurd o
  NeedInput _ c  -> runPipe (c ())
  Done r         -> return r
  PipeM mp       -> mp >>= runPipe
  Leftover _ i   -> absurd i

The HaveOutput case usually means that an output has been produced, here runConduit has restricted the output to be of type Void so this state can not occur in runPipe (in the absence of undefined and co). For intermediate conduits we do have Pipes with a HaveOutput constructor but those generally get consumed by a sink like sinkList or sinkVector, which place the final result in a Done constructor.

The other cases are less important for our story, but for completeness:

  • NeedInput means that an input must be passed to the conduit before we can continue. The input type is always () so we pass it a trivial () value and get a new pipe that we can recurse with.
  • Done means the conduit is finished and we can return the result of type r.
  • PipeM means that some work in the m monad must be performed, and then we get a new pipe to recurse with.
  • Leftovers can't occur in this runPipe call because the relevant parameter is Void (they are already handled by injectLeftovers)

For our runConduitWithWork implementation we run the pipe in a very similar fashion, but this time the pipe gets stored in a shared MVar[1] and the different threads take turns in evaluating it. The MVar acts as a locking mechanism so that only one thread can have the pipe at any point in time. Each thread keeps evaluating the pipe until they encounter some parallel work, at which point they put back the pipe, evaluate the work (which may take a while), and wait until they get the pipe again.

If any worker finishes because they encounter a Done constructor, the other threads are automatically cancelled and we return the result (through waitAnyCancel).

runConduitWithWork
  :: Int -> ConduitT () (WorkOr Void) IO r -> IO r
runConduitWithWork numThreads (ConduitT pipe) = do
  pipeVar <- newEmptyMVar
  threads <- sequence $ replicate numThreads $ Async.async $ runWorker pipeVar
  putMVar pipeVar $ injectLeftovers $ pipe Done
  snd <$> Async.waitAnyCancel threads

runWorker :: MVar (Pipe Void () (WorkOr Void) () IO r) -> IO r
runWorker pipeVar = loop
  where
    -- Take the pipe variable, so that no one else can.
    loop = takeMVar pipeVar >>= withPipe

    withPipe = \case
      HaveOutput pipe (WOWork w) -> do

        -- Put back the (modified) pipe for someone else to use, because we have work to do!
        putMVar pipeVar pipe
        liftIO w
        loop

      -- All the below is the same as 'runPipe' and is done within the critical section.
      -- This includes evaluation of upstream and monadic effects in the conduit.
      HaveOutput _ (WOValue o) -> absurd o
      NeedInput _ c -> withPipe (c ())
      Done r -> pure r
      PipeM mp -> mp >>= withPipe
      Leftover _ i -> absurd i

The big benefit of this implementation is the comparative simplicity, there is very little shared state (just one MVar) and therefore very little overhead. Communication between threads only happens when a parallel work unit is encountered.

In terms of CPU time this evaluator is almost as good as a single-threaded version of runConduitWithWork would be. Specifically when only a single thread is used, this implementation will do some extra putMVar and takeMVar calls when it encounters a WOWork but besides that the hot loop doesn't involve any work that runConduit doesn't do. If you do have multiple threads but the workload is sequential (so there are no WOWorks), a single thread will run the entire conduit in exactly the same way as runConduit while the other threads remain idle.

TMVar is faster, in this case

The Software Transactional Memory (STM) framework in GHC provides a bunch of functionality for concurrency and parallelism. STM is not always the right tool for the job, but in our case, replacing the MVar with it's STM counterpart (TMVar) gave a small but very measurable improvement!

Our theory is that the crucial difference lies in the fairness guarantees. Where an MVar ensures that all waiting threads get their turn in order, a TMVar only promises that some thread is woken and this allows the GHC runtime system to make more efficient choices.

For example if we have this sequence of events in the runConduitWithWork implementation:

  • Worker thread 1 is doing parallel work on CPU 1[2]
  • Worker thread 2 is doing parallel work on CPU 2
  • Worker thread 1 finishes the parallel work and takes the pipeVar
  • Worker thread 2 finishes the parallel work and needs to wait on the pipeVar
  • CPU 2 can't do anything for worker thread 2, so the runtime system finds some other thread to run (not related to runConduitWithWork)
  • Worker thread 1 returns the pipeVar, does some parallel work and wants to take the pipeVar again.

Now we reach a decision point! CPU 2 is still doing other stuff and now both worker threads are waiting for the pipeVar. If we're using a TMVar, CPU 1 will just continue running worker thread 1, taking the pipeVar. With an MVar we must ensure fairness and it will actively awaken worker thread 2. Worker thread 2 is then moved from CPU 2 to CPU 1.

In other words, the runtime system tries to make threads stick to one CPU but the fairness guarantees of an MVar sometimes prevents that. By using TMVars we're working with the work stealing scheduler rather than against it.

The GHC event log provides a lot of insight in these kinds of events. What we observed there seems to match our theory. This reddit discussion was quite helpful as well in figuring this out, in particular this comment by /u/fryguybob.

This doesn't mean that TMVars are always faster than MVar, it really depends on your use case. We tried applying the same trick in a few other places, but none of them saw a similar improvement.

Earlier implementations

We went through a bunch of iterations before we finally came up with the shown pipe-passing implementation of runConduitWithWork.

Initially we used a separate conduit:

runWork :: ConduitT (WorkOr a) a IO ()

This runWork conduit took care of consuming all the parallel work units, which allowed us to run the entire thing in the regular runConduit. Everything fits neatly in the Conduit framework, quite elegant!

Of course runConduit will evaluate the whole conduit structure in a single thread. The obvious way that runWork then evaluates parallel work is by putting it in some shared queue, where other threads can pick it up to evaluate it. This immediately gives some additional overhead because all work goes through the queue and has to be moved between different threads.

You also need to make some other tricky decisions, for instance:

  • A super long queue can be bad for GC so we want to limit it. What does the conduit do when the queue is full?
  • What if we want to run this workload on a single thread?

You could probably have runWork just block when the queue is full and get reasonable behaviour. Within our job evaluation system however, we want to strictly cap how many threads each job gets to use and this made everything tremendously complicated. To give you a rough idea of all the moving parts involved: We took the 'main' thread (the one that's running the conduit) and the 'extra' threads and tried to block/unblock them such that the total number of unblocked threads always matched the number of threads that were assigned to the job. When the queue was full, we would block the main thread (by doing a blocking takeMVar in the evaluation of the runWork conduit) and then unblock an extra worker thread. Conversely when the queue grew too small one of the extra worker threads would block and ensure that the main thread became unblocked.

The overhead of all of that was quite large, but for jobs that were a good fit for parallelization it was still an improvement. Once we switched to the current pipe-passing implementation, the performance difference was quite dramatic. Below we see a comparison between the load on actual production with runWork (the thin green line) and pipe-passing (the fat green line):

Intermediate improvements

No task queue then?

Most parallel evaluation systems, including the runWork conduit, involve a queue of tasks somewhere. Our pipe-passing implementation seems to get away without one, how does that work?

The conduit is the queue!

If you actually look at the work producers from the previous blog (sinkItems and the different aggregations), they all produce the work on demand. For instance if we look closely at the yieldAndSink example:

yieldAndSink :: [Item] -> ConduitT i (WorkOr o) IO [Item]
yieldAndSink items = yieldParallel 100 items .| sinkItems

When a worker thread pulls on this conduit, the yieldParallel will yield a single ParallelStream and sinkItems will promptly convert it to a WOWork parallel work unit. This all happens within the worker thread, which then proceeds to evaluate the work unit and another worker thread gets a turn with the conduit.

The aggregations work in a similar fashion, producing work on demand so that no queue is necessary. Unlike the yieldParallel/sinkItems combination they sometimes can't yet produce any work because it has to wait for other work to complete.

One interesting case is if you have a component that produces both WOWork and other things. For example you can construct a conduit that converts a bunch of parallel streams to a single sequential stream of items. This sequentialize is just like sinkItems .| Conduit.yieldMany, but could already start yielding items before it has received all the streams.

sequentialize
  :: ConduitT (WorkOr ParallelStream) (WorkOr Item) IO ()

This has to deliver WOWork for the workers to evaluate, but once the results of the workers start coming in it has to choose between yielding items or yielding work. You want to provide enough work so that all the workers are kept busy, but not too much so that you're not buffering huge amounts of items. In our implementation, we limit the total amount of slots that it has for receiving work results. When all the slots are taken, it will first yield items from the first slot before yielding new work to fill a new slot.

Also known as: continuation-stealing

If we check the literature, our pipe-passing implementation acts similar to continuation-stealing schedulers, while our old implementation was similar to the child-stealing variation.

Schmaus et al. in Nowa: A Wait-Free Continuation-Stealing Concurrency Platform note that continuation-stealing schedulers require fewer stack switches because "if the continuation is not stolen, which is the typical case, the worker can proceed without a stack switch". Figure 3 in that paper shows how that works. This is similar to what A. Robison already wrote in A Primer on Scheduling Fork-Join Parallelism with Work Stealing:

The previously mentioned irksome semantic difference between child-stealing and continuation-stealing is most evident when no thief is available. With continuation stealing, the order of execution becomes identical to as if the spawn and sync annotations were ignored.

Robinson also notes that child-stealing "requires space proportional to n to hold the unexecuted tasks", but with continuation-stealing "the number of extant tasks is no more than P, the actual number of threads executing the loop in parallel". That's indeed what we observed as well, with the task queue disappearing.

Estimating the appropriate number of threads

In the pipe-passing implementation of our evaluator the pipe is a shared resource. Only one thread at a time can use/modify the pipe. The time between taking and returning the pipe can be called a critical section in concurrent programming parlance. So in other words, only one thread can be in the critical section at any point in time.

The time that a thread spends in the runWorker loop can then be roughly divided in three parts:

  • wait%: fraction of time spent waiting for the pipe (because some other thread has it)
  • crit%: fraction of time spent in the critical section, where it can use and modify the pipe
  • par%: fraction of time spent evaluating parallel work (without the pipe)

If we measure these numbers we can make a pretty good estimate of how many threads are actually useful. Intuitively, if a single thread spends 1% of time in the critical section you can probably throw lots of threads at the problem. If a single thread has 90% crit time there is only 10% remaining time where other threads can use the pipe, so any additional threads will just spend most of their time waiting while increasing overhead. In the setup that we use at Channable we limit the total number of threads that all running jobs can use, so a thread that spends most of its time waiting will directly reduce our job throughput.

These crit/wait/par time percentages tend to change a lot during the run time of a job. When the job contains a few aggregations they tend to separate it into different sections, each of which allows for a different amount of parallelization. It's therefore useful to continuously measure them and adjust the number of used threads accordingly.

A neat trick to get a good running estimate for these is by using an exponential moving average. This can be used to turn a simple on/off signal into an average percentage, and can be implemented efficiently as a filter. Unlike a simple moving average we don't have to remember any previous values, so no ring buffer or anything like that is necessary.

To actually turn these numbers into an estimation of how effective our threads can be used, we used the model from the paper "Modeling Critical Sections in Amdahl’s Law and its Implications for Multicore Design" by Stijn Eyerman and Lieven Eeckhout. In particular we used formula 11 and by fixing some of the variables we ended up with a neat function:

modeledSpeedup
  :: Double  -- Fraction of time spent in critical section (for a single thread)
  -> Int     -- Number of threads
  -> Double  -- Estimated speed up compared to using a single thread

Then if we measure a crit% of 15% we can estimate the speedup for any number of threads, and the difference is then how effective each additional thread is:

| N | Speedup | Diff  |
|---|---------|-------|
| 1 | 100%    | +100% | 1st thread is 100% effective
| 2 | 196%    | +96%  | 2nd thread is 96% effective
| 3 | 287%    | +91%  |
| 4 | 375%    | +88%  |
| 5 | 426%    | +51%  | 5th thread is only 51% effective!
| 6 | 453%    | +27%  |
| 7 | 475%    | +22%  |
| 8 | 492%    | +18%  |

This estimation isn't perfect because we're only accounting for the things we're measuring, and not everything else that's happening on the server (even within the server process). So far it's been working surprisingly well though.

At the moment we use a simple cap of 70% on thread effectiveness. So according to the table with a crit% of 15% the job will try to use 4 threads. The threads are currently dealt out on a first-come-first-serve basis, so we need the cap to prevent a single job from wasting all our cpu resources in an inefficient manner while starving other jobs.

A different approach would be a central system that decides which job can use the core most efficiently, based upon factors like priority, current workload, existing cores, etc. Jobs would just use as many threads as possible (even at low effectiveness) but if another job comes along that can make better use of those threads we would move them over. This is something we still would like to implement in the future.

Below we see an example of the lifetime of a single job that takes about 60 seconds. The blue line is how many threads it's using, and the red line is the total CPU time that the whole server process uses while running just this job. The live measurements of crit%/par%/wait% and the speedup model allow the job to quickly jump to an appropriate number of threads while the job is going through different phases that allow different amounts of parallelism.

Conclusion

Our earlier blog posts showed that we can pass along computed values and parallel work units in the same stream, by using a ConduitT with the WorkOr type. We were able to implement a variety of actions in this way.

In this post, we showed that we can not only express all these different behaviours, but that we can also run them quite efficiently with our pipe-passing implementation.

We're very interested to hear if you know about libraries that do something similar to this, either in Haskell or in another language. Let us know on reddit or hackernews.

Next up

In the fourth and final blog post in this series, we will explain how we implemented conditionals. More precisely, we show how we can efficiently send only values that match a condition through an aggregation. As a preliminary to conditionals we need to ensure that evaluation is done in a non-blocking fashion, by allowing the conduit to signal the evaluator that no work is available at the moment.

Go read it now: Parallel streaming in Haskell: Part 4 - Non-blocking evaluation and conditionals

You can discuss this blog post on hackernews and on reddit.

1: Actually we use a TMVar instead of MVar, as is discussed in the next section, but using MVar makes the example code slightly easier to read.
2: In reality, threads run on capabilities. Those are not precisely the same as CPU's, but for our purposes we can pretend that they are.

Yorick SijslingSoftware Development
Joris BurgersSoftware Development