Parallel streaming in Haskell: Part 4 - Conditionals and non-blocking evaluation

January 25, 2023

Here is the final blog post about the parallel streaming evaluator we use at Channable, where we use Haskell with the Conduit library to produce both result values and parallel work units in a single stream.

We will assume that you've read the previous three parts, so go ahead and do that now!

In this blog post we will explain how we implemented conditionals. More precisely, we show how we can efficiently send only values that match a condition through an aggregation. As a preliminary to conditionals we need to ensure that evaluation is done in a non-blocking fashion.

Non-blocking evaluation

Sometimes during evaluation we have to wait on parallel work that is currently being evaluated. Examples where this might occur are:

  • during the joining phase of aggregations we know which input blocks we want to join together but those input blocks may not have been produced yet
  • if a sinkItems has produced work to consume every incoming stream, it can't do anything until all results are available
  • in the sequentialize variation we can have similar situations, where it can't yield any items until an incoming stream has completed

The most obvious place to do the waiting is within the top-level conduit. That's precisely what we did with the takeMVars in the sinkItems implementation of the first blog post, here's the code snippet:

        -- A 'Nothing' means that all the conduits before this one have finished.
        -- We now gather all the chunks, concatenate, and return the list of items.
        -- The 'takeMVar' ensures that we wait for all the parallel work to complete.
        Nothing ->
          fmap concat $ traverse (liftIO . takeMVar) chunkVars

A real problem with this is that the consumer of this conduit can't detect when such a blocking wait occurs. A function like our runConduitWithWork might observe that it takes a long time to take a step in the conduit, but it's hard to see if it's doing useful work or if it's just blocking. The ability to see that a work producer can't do anything at the moment is crucial in our implementation of conditionals, where we'll have two branches and if one blocks we may want to proceed working on the other branch. This will be further explained in the next chapter.

Our solution is to have a very strict non-blocking policy, and instead allow our conduits to yield a special signal that tells the consumer that it can't make progress. For this purpose we add a new constructor to our WorkOr type:

data WorkOr a
  = WOWork (IO ())   -- ^ A unit of work, to be executed at most once.
                     -- Different work units can run in parallel
  | WOValue a
  | WONothingYet     -- ^ NEW: Signal that we can't make progress now
                     -- because we're waiting on parallel work to complete

The takeMVar call is now forbidden with our non-blocking policy, but we can write a drop-in replacement for the liftIO . takeMVar combination that does what we need:

  :: MVar a -> ConduitT i (WorkOr o) IO a  -- Same type as `liftIO . takeMVar`
nonBlockingTakeMVar var =
  tryTakeMVar var >>= \case   -- Non-blocking read of the MVar
    Nothing -> do                 -- No value yet,
      Conduit.yield WONothingYet  -- tell consumer that we can't do anything,
      nonBlockingTakeMVar var     -- and then keep trying
    Just x -> pure x

This looks a bit like a busy wait with repeated tryTakeMVar calls, but the yield in between makes it slightly different. A carefully written consumer can observe when the WONothingYet is being yielded and gets to decide how to continue. In our runConduitWithWork implementation we just use a Control.Concurrent.yield (unrelated to Conduit.yield) to actively allow any other waiting threads to run before we continue our loop. We used to have a small threadDelay of 100 microseconds here, but that was inefficient. It cost more CPU time and the job ran slower.

      HaveOutput pipe WONothingYet -> do
        withPipe pipe

Another aspect of the non-blocking policy is that we never block within a parallel work unit or within a parallel stream. In other words, we only yield a WOWork parallel work unit or a ParallelStream when they can be evaluated immediately, in full, without blocking. So for example when we're in the joining phase of an aggregation, we only yield a work unit to join two blocks once the required blocks are available (as opposed to just yielding the work and then blocking during evaluation of that work). This gives us two important properties:

  • We always make progress, even when the number of WOWork parallel work units that we run at the same time is limited. You don't want all your threads to be blocked on results from a work unit that will never run because all threads are taken.
  • The evaluator can accurately measure how much work actually happens and how long we're waiting. This is necessary to estimate the optimal number of threads, as discussed in the Parallel streaming in Haskell: Part 3 - A parallel work consumer.


In our tool, users can specify conditionals. Let's say we have a conditional like this:

IF length(.description) > 10 THEN

The intent is quite clear: When the length of the description field is more than 10 we should apply action1 and otherwise we should apply action2. This works intuitively for actions like MAP and FILTER where we can look at every individual item, check the conditional, apply the corresponding action and then yield the modified item if it was not filtered out.

Other actions, for instance DEDUPLICATION or SORT, can't be applied on an individual item but we do allow them in conditionals. The exact interpretation is therefore a bit counter-intuitive to what programmers might expect, because we actually:

  • Process all items to see if they match the condition
  • All items that match the condition will be sent to action1, which may or may not immediately yield output items
  • All items that don't match the condition will be sent to action2, which may or may not immediately yield output items

This means that we need to partition the items based on the condition, evaluate the 2 branches and recombine the results. All of this needs to have a deterministic order and needs to be done as fast as possible.

One can think of the behavior as the following code:

conditional condition thenActions elseActions = \items -> do
  let (trueItems, falseItems) = partition condition items
  items1 <- thenActions trueItems
  items2 <- elseActions falseItems
  pure (items1 <> items2)

To precisely explain how conditionals are implemented, we'll consider 2 categories of actions that are allowed in conditionals:

  • Streaming actions like MAP and FILTER process one item at a time. Any input that they consume is directly translated into the corresponding output (or a lack of an output for a filter), so they're not allowed to remember some items and yield them later. If we pull on the output of the action we expect it to pull on its input, without any buffering in between.
  • Aggregating actions consume all the input items before they start yielding any output. We've discussed aggregations in the previous blog post, and it includes things like deduplication, sorting and grouping. The items are buffered in some data structure.

Based on the actions contained in a conditional we determine if this is an aggregating conditional or a streaming conditional, and we pick an evaluation strategy accordingly.

Aggregating conditionals

We will consider this regime if one of the branches contains at least one aggregating action. The main property of an aggregating action is the fact that it already has an implicit buffer. We'll use this "free" buffer (it doesn't cost us any extra) to make sure the results are deterministic. The assumption we are making is that there should always be at least one aggregating action in the else-branch. If there is only an aggregating action in the then branch, we modify the conditional in order to get the aggregation action in the else-branch.

IF not condition THEN

We now have a conditional where the else-branch contains at least one aggregation action. Both branches can additionally contain more streaming, aggregating, or any other actions. Our goal now is to write a function like so:

  :: (Item -> IO Bool)  -- ^ The condition
  -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()    -- ^ The then-branch
  -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()    -- ^ The else-branch, MUST be aggregating
  -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()

The conditional takes WorkOr ParallelStreams as the input, so there are a few cases that we need to deal with. The simplest case is when we get a WOWork parallel work unit, we can simply pass it along to the output without having to go through the conditional branches[1]. This is slightly more efficient, but more importantly it prevents duplication of work.

When we get a ParallelStream instead, we want to construct two ParallelStreams. By applying the condition to all the items we build one ParallelStream with just the values for the then-branch and another with just the values for the else-branch.

We effectively have three branches:

  • The pass-around branch for WOWork units from the input
  • The then-branch that produces WorkOr ParallelStream outputs
  • The else-branch also produces WorkOr ParallelStream outputs

The outputs of all of these branches is combined by simply interleaving them in whatever order the outputs come. For parallel work units this is certainly valid, because they can be executed in any order. The order in which we evaluate work doesn't influence the results.

For the ParallelStreams we have to be very careful though, because we want a deterministic ordering that is consistent between runs and that doesn't rely on specifics like chunk sizes. That's why we require the else branch to have at least one aggregation, because then we always get all the streams from the then-branch before we get the first stream from the else-branch. The deterministic order is then that all values where the condition applies come before the values where the condition doesn't apply (and for the rest items are ordered the same as in the input).

Zipping and WONothingYet

Combining the three streams is easier said than done. The logical route would be to use a ZipConduit for this. A ZipConduit duplicates each input to multiple conduits and then combines the outputs. They interleave the outputs in a left-biased manner, meaning that if multiple conduits have an output available (they're in the HaveOutput state), the output from the leftmost conduit is used first.

The snag with ZipConduit is that we have WONothingYets to deal with. Each of the three branches can potentially yield a WONothingYet to indicate that they can't do anything right now, but if another branch can still make progress we don't want to pass this WONothingYet to the output.

If we just look at how the then and else branches need to be combined, we already get a bunch of interesting cases:

Has WONothingYetHas WONothingYetForward a WONothingYet
Has WOWorkHas WONothingYetForward the WOWork
Has WONothingYetHas WOWorkForward the WOWork
Has a streamHas WONothingYetForward the stream (!)
Has WONothingYetHas a streamForward a WONothingYet (!)
Is DoneHas WONothingYetForward a WONothingYet
Has WONothingYetIs DoneForward a WONothingYet

Note that WOWork is always forwarded if it is available, regardless of the state of the other branch. If the then-branch can deliver a stream while the else-branch is blocked we can just forward that stream. If it's the other way around we first must wait for enough parallel work to complete and then we can grab the values from the then-branch first to keep our deterministic ordering, we signal that to the evaluator by yielding a WONothingYet.

To implement this logic, we took the existing zipConduitApp from the ZipConduit implementation and added the special cases relating to WONothingYet. Some of the cases listed above already happen to match the default behaviour.

There are 5 special cases where we need to overwrite the left-first bias for WONothingYet:

    go x@(NeedInput _ _)           (HaveOutput y WONothingYet) = HaveOutput (go x y) WONothingYet
    go x@(Done _)                  (HaveOutput y WONothingYet) = HaveOutput (go x y) WONothingYet
    go (HaveOutput x WONothingYet) y@(NeedInput _ _)           = HaveOutput (go x y) WONothingYet
    go (HaveOutput x WONothingYet) y@(Done _)                  = HaveOutput (go x y) WONothingYet
    go (HaveOutput x WONothingYet) (HaveOutput y WONothingYet) = HaveOutput (go x y) WONothingYet

We also included some other cases we modified. These cases are mainly to make the left-first assumption for values explicit.

    -- Only produce values for the else-branch once the then-branch is done
    go x@Done{} (HaveOutput y o) = HaveOutput (go x y) o
    -- If the aggregating action is producing outputs, all values should have been consumed.
    -- Otherwise, it is not a pure aggregating action.
    go x@(NeedInput _ _) (HaveOutput _ (WOValue _)) =
      error "Invalid case, the left required input but the right was already producing values."
    -- We do allow `WOWork` from the `else`-branch to continue, regardless of what is on the `then`-branch.
    go x (HaveOutput y o@(WOWork _)) = HaveOutput (go x y) o

The same implementation can also be used to combining the outputs of the then- and else-branches again with the outputs from pass-around branch.

A closer look at splitting

The diagram above shows a 'Is it a WOValue?' block that splits the stream and sends different values to the branches.

In principle, a ZipConduit and also our variation on it will simply duplicate the incoming values to all the contained conduits. If we want to send WOValues to one conduit and WOWork/WONothingYet to another, we can send the same WorkOr a value to all conduits and apply filters within them so that only the relevant stuff is kept. You'd typically use functions like these:

keepOnlyWOValue :: ConduitT (WorkOr a) a IO ()
skipWOValue :: ConduitT (WorkOr a) (WorkOr b) IO ()  -- Generic 'b' guarantees that no WOValue's are yielded

If we really zoom in on the 'Is it a WOValue?' split it more looks like this, where the split is implemented using our custom zipConduitApp.

Buffering results

For the 'Split on condition' block we can use a similar strategy, but it's a bit more complicated. We explained that from a single incoming stream we construct two separate streams for the two conditional branches.

The naive approach would be to let the two branches modify the incoming streams by applying a Conduit.filter. Something like this:

badConditional cond thenBranch elseBranch =
    thenBranch' = (\stream -> stream .| Conduit.filter cond) .| thenBranch
    elseBranch' = (\stream -> stream .| Conduit.filter (not . cond)) .| elseBranch
    ... -- zip the thenBranch' and elseBranch'

This might work, but the tricky part is that the incoming ParallelStream doesn't really contain the items, but instead it's a stream that can produce the items. Producing these items can be an expensive effort, because this might include all kinds of operations from upstream. In an implementation like this both branches run the incoming stream, so we evaluate the entire upstream twice!

To prevent this, we use a fixConditionalStreams function that runs each incoming ParallelStream just once, and produces two lists of items ([Item], [Item]), partitioned on whether the condition holds or not.

  :: (Item -> IO Bool)  -- ^ Condition
  -> ConduitT (WorkOr ParallelStream)
              (WorkOr ([Item], [Item]))  -- Items partitioned on condition

Then within the conditional branches we can pick the [Item] list that we need, and trivially convert it back to a ParallelStream by using Conduit.yieldMany.

Consuming these ParallelStreams shouldn't be done within the top level conduit, because then we can't take advantage of their ability to be run on multiple threads. Instead we do something similar to the sequentialize that was mentioned earlier, where a small buffer is used to store the evaluated results from several ParallelStreams. Every ParallelStream that comes in will be given a placeholder in the buffer. After this, we create a new IO () action that evaluates the stream and places the result in the placeholder.

The IO () is forwarded as WOWork, while the ParallelStream is discarded. Once the WOWork is executed, the result is put in the placeholder MVar and we can yield ParallelStream that just reads from the existing placeholder. Duplicating this new ParallelStream is cheap and safe, as the actual work is already done by the WOWork.

The idea is roughly similar to these functions (but the code for the true implementation is a bit different):

streamToWork :: MVar [Item] -> ParallelStream -> WorkOr a
streamToWork placeholder stream = WOWork $
  Conduit.runConduit (stream .| Conduit.sinkList) >>= putMVar placeholder

readFromBuffer :: MVar [Item] -> ConduitT i (WorkOr ParallelStream) IO ()
readFromBuffer placeholder = do
  items <- nonBlockingTakeMVar placeholder
  Conduit.yield $ WOValue $ Conduit.yieldMany items

The buffer is ordered, so we only return a ParallelStream once the head of the buffer is filled. That way, we preserve the order. We can produce multiple WOWork units to fill the buffer, but the ParallelStream will only be produced for the head of the buffer.

When pulling WOWork, we have a preference to produce items that have already been computed. So if we have a filled placeholder, we produce a new ParallelStream for that placeholder instead of pulling a new ParallelStream from upstream. That way, our intermediate buffer will remain short.

Note that the fixConditionalStreams function will yield WOWork units to consume the streams and can also yield WONothingYets when it needs to wait for that work to complete (though it tries to prevent such occurences). The framework with the WorkOr propagation neatly covers this use case. Looking back at the diagram for the branching of our conditionals, we do have to insert the fixConditionalStreams before the 'is it a WOValue?' split. Some minor modifications need to be made to pick the right [Item] list but the structure stays mostly the same.

Streaming conditionals

If all actions in both branches are streaming actions like MAP and FILTER, the whole conditional can become a streaming action as well. This is mostly done as an optimization, as we don't need to buffer any results during the evaluation. This can only be done because we know that every action in the conditional works on separate items, instead of on a collection of items. We use this property to modify the items in a streaming fashion, instead of all at once, as previous results don't influence results of items that are still to come.

For each input item we can check the conditional, pass the value through the corresponding branch, and immediately yield an output item if it wasn't filtered out. This is the ideal and most efficient case, for which we have a specialized implementation.

For streaming conditionals, we use a ZipConduit to duplicate every item and send 1 copy to the then-branch and the other to the else-branch. The then-branch starts with a filter to just match those items that match the condition. The else-branch also has a filter, but with the inverse of the condition. To prevent us from evaluating the condition twice, we evaluate the condition once and store the result with its item. The filter on the then and else branch then only has to check a boolean.

Consider the following example:

IF condition THEN

Assuming that both action1 and action2 are streaming actions we can convert this using the following Haskell:

  :: (Item -> IO Bool)
  -> ConduitT Item Item IO ()
  -> ConduitT Item Item IO ()
  -> ConduitT Item Item IO ()
streamingConditional condition action1 action2 =
    Conduit.mapM (\i -> (i,) <$> condition i)
                .| getZipConduit (ZipConduit trueConduit <* ZipConduit falseConduit)
    trueConduit, falseConduit :: ConduitT (Item, Bool) Item IO ()
    trueConduit = Conduit.filter snd .| fst .| action1
    falseConduit = Conduit.filter (not . snd) .| fst .| action2

Both trueConduit and falseConduit expect a input items to be paired with a boolean. The trueConduit will only process items with a True, while falseConduit will only process False items. This function can be used to modify individual streams. It only works if every action in the branches is a streaming action. If any branch contains an aggregating action, we need to revert back to the main conditional tactics.

  :: (Item -> IO Bool) -- ^ The condition
  -> ConduitT Item Item IO () -- The then-branch
  -> ConduitT Item Item IO () -- The else-branch
  -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()
conditional condition action1 action2 =
  let mapStream parallelStream = parallelStream .| streamingConditional condition action1 action2
  in (fmap mapStream)

The main difference between streaming conditionals and aggregating conditionals is the level of abstraction they work on. Conditionals normally work on on all items, so we need to collect all streams and apply the condition once we have all streams. Therefore, aggregating conditionals work on entire streams and are passing along instructions. If we use streaming, we know that we don't need all streams at once and we can push the conditional into the instruction, thereby applying the conditional in a streaming fashion.

If you like overcomplicated diagrams, we could represent it like this:


In this series of blog posts we've set up a system for parallel streaming evaluation.

We showed how we can pass along computed values and parallel work units in the same stream, by using ConduitT with the WorkOr type:

data WorkOr a
  = WOWork (IO ())
  | WOValue a
  | WONothingYet

This gives us a solid base, where we can write parallel streaming components like map, deduplicate, sequentialize and conditional aggregations. These all seamlessly connect together into a single parallel streaming pipeline. Running the pipeline is surprisingly simple with our pipe-passing implementation, which makes optimal use of the work-stealing scheduler that GHC's runtime system provides. By implementing everything in a non-blocking fashion we were able to accurately track how much work our threads are doing.

The system has proven to be very flexible and we're happily using this in our production systems. It can run multiple threads when it's useful, but can also scale back to efficient sequential evaluation (equivalent to just using runConduit) when it's running things that aren't parallelizable. It has shown to be an improvement over our earlier sequential implementations in every way.

What do you think about our approach? Do you have a better name for the WorkOr type? Let us know on reddit or on hackernews.

[blog4]: "Parallel streaming in Haskell: Part 4 - Conditionals and non-blocking evaluation"

1: We could also choose to send WOWork through one of the conditional branches, as long as we don't send them through both because we don't want to receive the same work unit twice and evaluate it twice. The 'is it a WOValue?' split is cleaner and makes more sense to implement as a reusable function.

Yorick SijslingSoftware Development
Joris BurgersSoftware Development

We are hiring

Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!

Apply now