Channable

Tech

Parallel streaming in Haskell: Part 2 - Optimized parallel aggregations

January 11, 2023

This is part 2 in our series about the parallel streaming evaluator that we use at Channable, where we use Haskell with the Conduit library to produce both result values and parallel work units in a single stream. Part 1 explained the basics of our evaluator, why it's useful and some of the use cases for it. If you have not done so yet, read part 1 first:

Parallel streaming in Haskell: Part 1 - Fast, efficient, and fun!

In part 1 we've mainly worked with map and filter. They work on each item individually, so they can easily be applied on parallel streams as well.

It becomes more difficult when you need to interact with multiple items at the same time. We will refer to these types of actions as aggregations, mimicking the terminology from SQL databases. In other languages you would typically express these as a fold or reduce. In this blog post, we'll explain how we parallelized such aggregations.

Aggregations

We will use deduplication as the running example. Deduplication works by removing duplicates from the incoming items by a certain expression (such as a field). In Haskell, the type signature of deduplication would look like

deduplicate :: Ord key => (Item -> key) -> [Item] -> [Item]

Our implementation also sorts the items along the way, to make it easier to write an O(n log n) algorithm. It's similar in spirit to nubSortOn.

We use deduplication in the examples, but we also support other actions such as:

  • Grouping: Group items on a certain key, returning a new item with combined fields.
  • Sorting: Sort items on a certain key
  • Deduplication, remove duplicates: remove all items with duplicate keys
  • Deduplication, keep a certain item: deduplication, but keep the hightest / lowest value

All of our aggregations work on the same principle: Group all items and join them in a certain way, possibly removing them. In a Haskell function, this would have the following type signature.

aggregation
  :: Ord key
  => (Item -> key) -- ^ The key on which to group the items
  -> (Item -> Item -> [Item]) -- ^ The function to combine 2 items, return 0, 1, or 2 items
  -> [Item] -- ^ The input items
  -> [Item] -- ^ The resulting items

In the examples below, we use numbers instead of concrete items. You can imagine that these numbers are the result of applying (Item -> a).

High level algorithm

A naive attempt at a parallelized deduplication algorithm might work like this:

In this case, we were unlucky and the 3 showed up in both chunks and was therefore not recognized as a duplicate. Clearly, a deduplication function needs to look at all inputs to work, so it sounds like we should just accept that we can't do this in parallel.

But never fear, we actually can. Mostly. The algorithm to do this is a classic divide-and-conquer style algorithm very similar to merge-sort. The high level approach contains the following steps:

  • Split: split the incoming data into 2 chunks
  • Sort: Sort all items inside their chunk by their deduplication key.
    This sorting step does add the constraint that the key we sort by is comparable i.e. we can use < and >.
  • Deduplicate: Deduplicate each chunk on their own
  • Join: Join the 2 resulting blocks, removing any duplicates.

Deduplicating and sorting the individual chunks can be done in parallel, but joining them has to be sequential. For performance, it's best to do as much as possible in the dedup+sort step and as little as possible in the joining step.

Joining efficiently

The key to joining efficiently is the Sort step, which happens in parallel. We join 2 lists that are already sorted, making the joining much easier. This is almost the same as the 'merge' step in a merge sort algorithm, with a small difference when duplicates occur.

We start the Join step with 2 lists of deduplicated items, sorted by the deduplication key. The result of this Join is also a sorted, deduplicated list. Start looking at the lists l:ls and r:rs. x:xs means a list where x is the first element of the list and xs is everything else on the list. There are 3 cases:

  • l == r: We found a duplicate. We can either decide to always return l or always return r or have a special join function. In that case we return join(l, r). We recurse on ls and rs.
  • l < r: As l is smaller than r and r is the smallest element of r:rs (due to the sorting), we know that there is no occurrence of l in r:rs. We return l for the new list. As l is smaller than any items in r:rs or ls, it keeps the resulting list sorted. We recurse on ls and r:rs.
  • l > r. We continue with r as the head of the result list by the same logic as l < r, but now with the operands reversed. We recurse on l:ls and rs.

This join step is O(n) where n is the sum of the length of both lists. Once any of the list is empty, we can append the other list to the end of the resulting list.

In the image below, we walk through 2 lists step-by-step. The comparison step as outlined below is shown above each step The arrow contains the value of the head of the list.

Scaling up

As you might imagine, making this work with only 2 streams is a bit pointless. However, we can use the Join step multiple times, as the result of Join has the same format as the processing of an individual chunk: a sorted and deduplicated list. If we have 4 chunks, the steps are:

  • Split the input into 4 chunks, chunk 0, chunk 1, chunk 2 and chunk 3.
  • Deduplicate and sort all chunks in parallel
  • Join chunk 0 and 1 and chunk 2 and 3 in parallel, resulting in chunk 01 and chunk 23 respectively
  • Join chunk 01 and chunk 23 to get the final result: chunk 0123

Scaling this up further is more of the same.

In general, the algorithm can be described as follows:

  • Receive parallel streams
  • Give every chunk an index and a layer. The layer starts at 0. The index is the position of the chunk in the layer, starting at 0.
  • Deduplicate and sort the streams in parallel to obtain the first layer
  • Once we have 2 deduplicated chunks that can be joined, join them, putting the result in a higher layer If we have a leftover chunk in a layer, we join that with an empty list, resulting in the original chunk in the next layer.
  • Repeat until we have the final result.

We want the result to be deterministic, independent of timing issues due to multithreading and chunk sizes. For this we have to ensure that the join operation is assocative (a <> (b <> c) == (a <> b) <> c) and that we always keep all the inputs in the right order. It's also worth noting that if we would do ((a <> b) <> c) <> d we can't parallelize anything, but with (a <> b) <> (c <> d) we can evaluate a <> b and c <> d at the same time.

Together these requirements mean that we have fixed pairs of blocks (for joining), solely based on their position in the layer before. Actual evaluation of those joins can happen in any order due to multithreading.

Step by step execution of aggregations

We've been talking about running these sort+dedup and joining steps in parallel. Remember though that an aggregation action by itself is just a conduit that will only do anything when the evaluator tries to pull data from it. In order to run this parallel work, it will wrap them in a WOWork constructor and yield them to the next action. They'll get propagated to the end of the conduit and then runConduitWithWork will ensure that the work gets evaluated.

So our WOWork units are responsible for deduplicating, sorting and joining all blocks. They will put their results during the deduplication process in an internal data structure, local to the aggregation. Every time that the conduit of the aggregation action gets pulled on, it looks in the data structure to see if it can produce any new work, for example by combining 2 existing chunks. Only after all the blocks are produced, the deduplication action can finally stream the deduplicated items.

Lets go through an example of a deduplicating and joining step, using these work units.

We'll pick a specific section from the example above. We are zooming into the highlighted section from the larger example above. In this case, we will deduplicate 2 blocks and join them. The yellow sections indicate the internal structure above. The colored blocks indicate the different producers of work.

In step 1., the evaluator asks (via the other producers) the join work producer (purple) for some work. Purple doesn't have any work, as it doesn't have all of its inputs. It asks its input (blue) if it has some work. Blue has some work (step 2.), so it gives that to Purple. Purple moves that work along (step 3.), so it will reach the evaluator, which it can execute on a separate worker.

The blue work is running on the parallel worker. The evaluator doesn't need to wait for the work to complete. It can already ask purple for some more work (step 4.). As before, purple doesn't have all the work yet, so it asks its input for some more work. The answer comes back that there is more work by Green. The work green has (step 5.) is send to purple. Purple sends the work to the evaluator (step 6.)

The evaluator asks purple for some work for the third time (step 7.). The inputs of Purple don't have any more work. Assuming the work was already complete, Purple can make the join of the 2 blocks (step 8.) and yield it to the evaluator (step 9.). In the next blog, we explain how we deal with this situation if this assumption is not yet true.

The evaluator asks purple for the final time if it has something for us (step 10.). This time, the items are already computed by step 8. and available in the internal datastructure (step 11.).

After this, the resulting items are either placed back into the resulting datastructure to be combined with another block or are streamed towards the next conduit, if these were the final blocks that needed to be joined.

Conclusion

While we did not show a concrete implementation, we hope to have convinced you that our work-propagation system introduced in part 1 is flexible enough to implement aggregations, as well as the simpler map and filter actions.

The implementations for these actions are similar to what we run on our production servers. We still have some future work in parallelizing some other minor actions. Our current system supports both sequential and parallel actions interleaved. Having this freedom is a nice benefit of our system, but we would like to parallelize all implementations as much as possible. We can port these last sequential implementations to parallel versions, but as these actions are rarely bottlenecks, this doesn't have the highest priority.

Finally, aggregations as described are not always the most efficient approach. The described algorithm works decently for all cases, but there are specific cases that might benefit from their own, specialized, implementation. An example would be when counting numbers, where the join step is nothing more than adding two numbers together. It would be more efficient to do this within the producer instead of spawning work for this that can be executed on a separate thread, as the overhead of producing some Work trumps the benefit of executing + in parallel.

Next up!

Next in this four-part series is Parallel streaming in Haskell: Part 3 - A parallel work consumer. We will show a concrete implementation for the runConduitWithWork function from
part 1:

runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r

This function runs the conduit and will consume and run any WOWork parallel work units that are produced in the meantime. The implementation is suprisingly simple!

You can discuss this blog post on hackernews and on reddit.

[blog2]: "Parallel streaming in Haskell: Part 2 - Optimized parallel aggregations"
[blog4]: "Parallel streaming in Haskell: Part 4 - Conditionals and non-blocking evaluation"

avatar
Yorick SijslingSoftware Development
avatar
Joris BurgersSoftware Development

We are hiring

Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!

Apply now