January 4, 2023
Channable is a feed processing tool where users define rules to organize and modify their product data. We process over 113 billion items per day, roughly 1.3 million items per second on average. These items range from shirts to screens to self-knitted dolls. As you can imagine, speedy execution is of the essence for us.
Over the last 2 years, we moved our inherently sequential data processing engine, written in Haskell, to a parallel version. Running the parallel version of our system barely increases CPU time, while the wall time (time from start to end) is significantly reduced.
This post explains how we parallelized our system without incurring any significant overhead costs, allowing us to linearly speed-up our workloads with the number of cores available (up to a plateau, see plot below). We had the following requirements for our design:
There's a lot to say about this topic, so we've written four separate blog posts for you to enjoy. The first post (this one) explains the problem and the core ideas. It uses Haskell examples, but we tried to keep it beginner-friendly. In the next three blog posts we'll discuss some of the highlights, technical details and interesting things we discovered along the way, for instance why
TMVar is faster than
MVar for our approach.
Addendum: We also gave a talk about this at the Utrecht Haskell Meetup on 5 June 2023. It's a quicker introduction to the core idea of our parallelization system. You can find the slides at haskell_meetup_parallel_streaming.pdf
Channable is a product feed processing tool. Our customers have a business, like a store, and they want to expose their products to the world. We import these items every day, process them according to the customers wishes and export them to multiple platforms, such as online marketplaces, price comparison sites, advertisers, etc.
Customers modify their data using
IF ... THEN ... ELSE ... rules. These rules are sent to our data processing engine. Our data processing engine executes these rules and returns the results for other services to use.
This engine is running on dedicated servers that receive requests through a REST api. The requests contain all information about how the data should be processed. Any request that comes in, is parsed into an AST for our internal language. The internal language consists of many layers, but we'll focus on actions and expressions.
Values in our language can be booleans, texts, numbers, hashmaps or lists. In all cases they're immutable, so an expression takes input values and produces a new output value. These expressions support generic concepts such as
If, but also more domain-specific functions such as
DecimalRound. An example of an expression is
SetField(item, "foo", 3), which does not change
item but returns a new item which has field
foo set to 3.
While expressions work on individual values or items, actions work on the level of all items. It is possible for an action to process all items at once or one at a time. The simplest actions are
Filter. A more complicated action would be deduplication, which removes all duplicate items. This is a simplification of our actual AST of course 
= MapAction Expression
-- ^ apply the expression, the expression should return an item
| Filter Expression
-- ^ keep the item if the expression returns True. The expression should return a boolean.
| DeduplicateAction Expresssion
-- ^ If 2 expressions return the same value, we remove the items.
Once we compiled an AST for the request, we submit this as a job to our scheduler. The scheduler decides when to run these jobs and how many resources (CPU cores) they're allowed to use. The scheduler tries to maximize throughput and minimize latencies.
One of the things that this scheduler takes into account when planning which Job can go next is its priority. We have 2 categories of Jobs:
Now we have a compiled AST in our scheduler. Once the scheduler decides it needs to run our Job, the actual purpose of this blog can start: How do we get the results as fast as possible without any overhead?
Let's start with a naive evaluator of the datatype described above.
eval :: [Action] -> [Item] -> [Item]
eval (MapAction e : rest) items = eval rest (map (expectItem . evalExpr e) items)
eval (FilterAction e : rest) items = eval rest (filter (expectBool . evalExpr e) items)
eval (DeduplicateAction e : rest) items = eval rest (nubBy (\i1 i2 -> evalExpr e i1 == evalExpr e i2) items)
evalExpr :: Expr -> Item -> Value
evalExpr = ...
expectItem :: Value -> Item
expectItem = ...
expectBool :: Value -> Bool
expectBool = ...
This evaluator executes the actions as described above, though not very efficiently. In the next section we will get rid of all the intermediate lists to make it faster. This code is also entirely sequential, but we can quite easily parallelize
FilterActions. In this blog we'll show how we do this by splitting up the whole list of items into smaller chunks and evaluating each chunk on its own thread. We have a mostly-parallel implementation for
DeduplicateAction as well. It's a bit more complicated than the implemenations for
FilterAction, so we'll discuss that in our next blog post.
In the example above, each action acts as a function from
[Item]. If we want to apply multiple actions, we basically get something like this.
A big issue with this approach is that intermediate results are stored in memory in their entirety. These intermediate results don't just cost space but they also present a big challenge for the garbage collector (GC) that needs to inspect all the values inside. In some situations laziness may prevent this problem. We don't want to rely on laziness, however, because it's easy to get wrong and it's hard to ensure that it always works.
The standard solution to this is to use a streaming implementation. Instead of applying action 1 on all items then action 2 on all items and so forth, we do all actions on item 1, then all actions on item 2, etc. until all items are dealt with. Only one item is actively being worked on at any given time, so there is less data that the garbage collector needs to look at.
Examples of this approach are numerous within the Haskell ecosystem and computer science literature, for instance short cut fusion for lists and streaming implementations of many functions in Data.Vector and Data.Text. All of these are fairly specific to the corresponding types, but it shows that the idea is valid.
In our implementation we chose a more direct way to represent streaming components using the Conduit library. Central to this library is the
ConduitT type, which we'll colloquially refer to as 'a conduit'. We'll give a short overview of the parts that we're going to use, hopefully that's enough to help you through this blog post.
A conduit of type
ConduitT i o m () can be seen as a machine that consumes inputs of type
i and produces/yields outputs of type
o. In other words they act as a stream transformer, because they turn a stream of
is into a stream of
Some typical examples of conduits:
-- Produce a value of type o for each consumed value of type i, by applying the mapping function
Conduit.map :: Monad m => (i -> o) -> ConduitT i o m ()
-- Filters the values of a by applying the condition.
-- This does not modify the values, but might remove some or all items.
Conduit.filter :: Monad m => (a -> Bool) -> ConduitT a a m ()
-- Construct a conduit that produces all the values from the input list.
-- The conduit does not consume any inputs (so doesn't restrict the parameter i).
Conduit.yieldMany :: Monad m => [o] -> ConduitT i o m ()
A conduit can perform monadic actions in the monad
m. For instance,
mapM could perform some monadic action every time an
o is requested.
Conduit.mapM :: Monad m => (i -> m o) -> ConduitT i o m ()
In addition to the stream of outputs, a conduit can return one final result at the very end. The last type parameter of
ConduitT is used for that:
-- Consume values of type Int, don't produce anything, at the end return the sum of the values.
Conduit.sum :: Monad m => ConduitT Int o m Int
-- Consume values of type i, at the end return all of them as a list.
Conduit.sinkList :: Monad m => ConduitT i o m [i]
If one conduit yields values of some type
b and another conduit consumes values of that same type, that producer and consumer can be connected together with the
.| (fuse) operator. For example,
Conduit.map f .| Conduit.map g makes a new conduit that applies the function
f . g to every value. The same
ConduitT type is used to represent single components or whole pipelines with multiple components.
(.|) :: Monad m => ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
Conduits can be evaluated only if they don't yield any outputs or consume any inputs. In the
runConduit function below, the input type
() and output type
Void enforce this property . The conduit can deliver a single end result of type
r. This means that, if any part of the conduit is producing outputs (with
yield) they must be consumed by a later part of the conduit to get something that can be evaluated.
runConduit :: ConduitT () Void IO r -> IO r
With all of this, we can run a simple conduit:
λ> runConduit (Conduit.yieldMany [1,2,3] .| Conduit.map (*2) .| Conduit.sum)
The chain of actions from the earlier map/filter/map example can be written as:
let example = Conduit.yieldMany [1..8] .| Conduit.map (+1) .| Conduit.filter odd .| Conduit.map (*2) .| Conduit.sinkList
If we call the above example, we get a conduit. Once we call
runConduit, the results are evaluated.
λ> runConduit example
Conduits (and many other streaming implementations) are pull-based, meaning that we pull at the end of the conduit for an item and only then will the conduit do the work necessary to produce that item. If you do not consume any items from a stream, no work is executed. The conduit may produce the items by itself (as
yieldMany does) or it may request one or more items from earlier conduits first (for instance
Looking closely at the map/filter/map example, we see that running this conduit involves:
runConduit asks the consumer (at the end of the conduit) to work towards an end result
map *2 block for an item,
filter odd block for an item,
map +1 block for an item,
filter odd and
map *2 to the consumer.
Instead of taking the streaming approach one might try to use a better data type to store the intermediate results. Of course, there are data types other than
[Item] that will use less GC time, but as soon as a GC pass over your data type takes
O(n) time, you'll run into trouble.
If all your elements have a constant memory size this might be feasible, you could fit everything in an unboxed vector which is super cheap for the garbage collector. This doesn't help us though, as our items mostly consist of textual data of varying sizes. In an earlier blog post we discussed this problem and presented a solution with compact regions to get rid of these GC times quite effectively. This solution comes at the cost of pure writing speed so we don't want to do that after every single action, but only when it's really necessary.
Within our job processing system, a single job usually starts and ends with one of those GC-friendly compact-region-based data sets. The data sets at the beginning and end are stored in a big cache and they're worth the extra work of storing them in a compact region (for which the garbage collector has to check fewer pointers). In between we try not to store our items at all, and since we only have one live item at a time, it can be represented in a way that is efficient for evaluating actions.
Not all actions can be performed in a completely streaming fashion. A typical example is the sorting action, where you can't know the first item that must be yielded before looking at all the input items. These actions will inevitably require some form of in-memory storage and we go to great lengths to do that in a GC-friendly manner.
Our use case for these streams involves a lot of embarrassingly parallel work like maps and filters. By running our jobs with multiple threads, we would hope to be able to apply these actions on multiple values in parallel, which should reduce waiting times for our customers.
It's not so easy though, because a stream implemented with conduits is fundamentally a single threaded component. It's impossible to use multiple threads to pull values from a single conduit at the same time (they could take turns, but all the work still happens sequentially). Internally, conduits are evaluated in a stepwise fashion and each step gives you a new conduit that can be evaluated further. Any conduit evaluation function will necessarily have to go through the steps in order, and there is no way to do the steps in parallel.
Next, we will show how we can get all the benefits from streaming, while running the evaluation on multiple threads.
As a first step, we split the incoming data into chunks and create a stream for each of those. Those streams can be represented by a conduit that yields items, but doesn't consume anything:
type ParallelStream = ConduitT () Item IO () -- Produces items
As a simple prototype, we could use lists of
ParallelStreams and it might look something like this:
-- | Split a big list of items into chunks of the given size, and create a stream for each chunk
splitIntoListOfStreams :: Int -> [Item] -> [ParallelStream]
runStreamList :: [ParallelStream] -> IO [Item]
runStreamList can be implemented to use multiple threads. A single thread could take all the streams from the top-level list and assign each
ParallelStream to a thread that becomes responsible for evaluating it. Only at the end it does it need to concatenate all results together to produce a single list of items. This relies on the ability to push most of the significant work into the parallelizable streams, whereas the top-level list must be cheap to evaluate because it happens sequentially.
A naive implementation might add all the items to a single big list as soon as a thread finishes work on a stream. This is unlikely to do what you want, because the threads will not finish their work in a predetermined order. For deterministic results, the implementation should carefully maintain the order of the streams when concatenating their results. The items from stream 1 should go before the items from stream 2, which should go before the items from stream 3, and so forth. One way we could do this, is by letting
runStreamList reserve a spot (like an
MVar) for the data when a
ParallelStream comes in.
By using lists of
ParallelStreams, we can implement actions like map and filter quite easily and in a way that doesn't interfere with the parallelism of the streams.
mapListOfStreams :: (Item -> Item) -> [ParallelStream] -> [ParallelStream]
filterListOfStreams :: (Item -> Bool) -> [ParallelStream] -> [ParallelStream]
By adjusting our running example to use these parallel streaming functions with chunks of size 4 we get something like below, with two threads that each run one conduit that processes 4 items:
Sometimes we have work that should be done in parallel, without directly producing items.
Consider an example like this:
The group action drastically reduces the number of items. It might receive 10,000 parallel streams, but since it only produces a few items, it should ideally only produce a single
ParallelStream. The problem is that the amount of parallelism in our
runStreamList function directly corresponds to the number of
ParallelStreams that it gets. If it only receives one
ParallelStream then it can only use one thread to evaluate the whole pipeline.
Additionally, the group action needs to consume all the incoming items before it knows what the first output item will be. So before even giving you the first
ParallelStream, a group action has to do a whole lot of work including the consumption of all the
ParallelStreams that it receives. Most of this work, certainly the consumption of those streams, can in principle be done in parallel. We'll explain more about that in the blog about aggregations.
To get all that parallel work evaluated, we simply pass it along just like
ParallelStreams. For this we introduce a new data type:
data WorkOr a
= WOWork (IO ()) -- ^ A unit of work, to be executed at most once.
-- Different work units can run in parallel
| WOValue a
deriving (Functor, Foldable, Traversable)
Where we used to have a
ParallelStream type, we now wrap it in a
WOValue constructor and get a
WorkOr ParallelStream. The
WOWork constructor contains some arbitrary
IO () that can be executed in parallel with other
IO () does not return any value, so it is by itself responsible for storing the result of the computation, using an
IORef or an
MVar, for example.
In principle we could lift any
IO () into a
ParallelStream that runs that IO without yielding any items, but we find an explicit representation to be more convenient. It also allows for some more optimizations, as we don't care about the order of
WOWork, but do care about the order of
Since we want to lazily unroll the top-level list, we replace it with a conduit so that we get more control over when it gets evaluated. The typical usage pattern is now to have two layers of conduits. For instance, a parallel map now produces a conduit that both consumes and produces
parallelMap :: (Item -> Item) -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()
This new parallel map still applies the mapping function to each item in every stream, and, in addition to that, any parallel work (
WOWork constructors) are passed along unaltered.
Many actions will simply pass along the work, but some will create new parallel work. We'll see a few examples of these later.
All our conduits get connected together and at the very end, the parallel work is consumed and evaluated using multiple threads. We define a special
runConduitWithWork function to evaluate these work-producing conduits. Whereas the
runConduit function does not allow the conduit to produce anything at all (besides the end result), this new function does allow it to produce
WOWork parallel work units.
-- Standard conduit evaluation
runConduit :: ConduitT () Void IO r -> IO r
-- Parallel evaluation of produced work units, using the number of threads as the argument
runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r
In an earlier variation we used a separate conduit of type
ConduitT (WorkOr a) a IO () to consume the work. By adding such a work consuming conduit at the end, we were able to use the regular
runConduit function to evaluate everything. There are some drawbacks to that approach for our specific situation, but we'll discuss that and the inner workings of this function later in Parallel streaming in Haskell: Part 3 - A parallel work consumer. For now we ask you to trust us that we have good reasons to have a dedicated
Let's practice a bit with how our new work-enabled conduits can be used. To start with, this is what a filter action might look like:
parallelFilter :: (Item -> Bool) -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()
parallelFilter filterFunction =
filterStream :: ParallelStream -> ParallelStream
filterStream parallelStream = parallelStream .| Conduit.filter filterFunction
in Conduit.map (fmap filterStream) -- Uses the Functor instance of `WorkOr`
All of the parallel work units are passed through unmodified, because the
fmap does not touch them. Each incoming
ParallelStream is adjusted to apply the filter action at the end.
filterFunction is not yet executed when you run the top-level conduit. Only when a consumer runs one of the
ParallelStream conduits that we modified here, is the work done. This property is important because the top-level conduit gets evaluated sequentially while the
ParallelStreams (and hence the execution of the
filterFunction) get evaluated in parallel.
In a similar fashion we can define a parallel
mapM. Both pass along the parallel work unmodified and just adjust the
ParallelStreams. The only difference between the implementations of
parallelMapM is the conduit we attach to the end of the
parallelMap :: (Item -> Item) -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()
parallelMap f =
let mapStream parallelStream = parallelStream .| Conduit.map f
in Conduit.map (fmap mapStream)
parallelMapM :: (Item -> IO Item) -> ConduitT (WorkOr ParallelStream) (WorkOr ParallelStream) IO ()
parallelMapM f =
let mapMStream parallelStream = parallelStream .| Conduit.mapM f
in Conduit.map (fmap mapMStream)
We can define a helper function to generalize this pattern. An actual implementation is left as an exercise to the reader.
liftConduit :: ConduitT i o m r -> ConduitT (WorkOr i) (WorkOr o) m r
parallelMapM function can now be defined in terms of
parallelMapM f = liftConduit (Conduit.mapM f)
ParallelStreams don't come out of thin air, so we need some way to produce them. Let's say that we start with a list of items and that we have a
toChunks function of type
Int -> [Item] -> [[Item]] that splits the list into chunks of the given size. Now we can define a function that converts every chunk into one parallel stream:
yieldParallel :: Int -> [Item] -> ConduitT i (WorkOr ParallelStream) IO ()
yieldParallel chunkSize =
Conduit.yieldMany . map chunkToStream . toChunks chunkSize
chunkToStream :: [Item] -> WorkOr ParallelStream
chunkToStream chunk = WOValue (Conduit.yieldMany chunk)
Here we have a
Conduit.yieldMany on two different levels. One is for the outer conduit and the other is for the inner conduits (the
The items that each stream will yield is completely fixed from the start. For example with a chunk size of 100 the first stream will always yield items 1-100, the second stream will always yield items 101-200 and so forth. This is always the same, regardless of the order in which the streams actually get evaluated.
To ensure that parallelism works well, the top-level conduit must be fast to evaluate, which includes the
toChunks function. Reading the chunks is done within the
ParallelStream, so it's less of a problem if that's a bit slower. In the real implementation, instead of using a simple lists of items, we specialized the data format for efficient chunking, while de-prioritizing reading. In some cases, the
toChunks implementation will even postpone some of its work until you actually stream the chunk.
We've talked about consuming parallel work and how to produce
ParallelStreams, but we're still missing the connection between these two.
sinkItems function is a way to bridge the two:
sinkItems :: ConduitT (WorkOr ParallelStream) (WorkOr o) IO [Item]
This function consumes all incoming streams and produces a single list of items at the end. It's a bit similar to what a
Conduit.sinkList combined with a
concat would do. We have to be a bit careful though, we have to ensure that evaluating the top-level conduit does not do all of the work we've built up in the parallel streams. Instead, this
sinkItems function will produce
WOWork parallel work units that will each consume a
ParallelStream and produce a chunk of items. All these work units can be run at the same time, and once they're all done, the
sinkItems function can de-chunk and return the complete list of items in order.
To make this all a bit more concrete, we show a working implementation of
sinkItems. It builds up a list of
chunkVars :: [MVar [Item]] that will contain the result of every individual stream in the precise order that they came in, regardless of when they are actually evaluated. This makes the order of the results completely deterministic, just as we saw before with
sinkItems :: ConduitT (WorkOr ParallelStream) (WorkOr o) IO [Item]
loop :: [MVar [Item]] -> ConduitT (WorkOr ParallelStream) (WorkOr o) IO [Item]
loop chunkVars = do
Conduit.await >>= \case
-- When we receive a stream..
Just (WOValue stream) -> do
chunkVar <- newEmptyMVar
-- ...construct a work unit that consumes the stream and puts the
-- result in a new allocated variable.
let work = do
chunk <- runConduit (stream .| Conduit.sinkList)
putMVar chunkVar chunk
Conduit.yield (WOWork work)
loop (chunkVars ++ [chunkVar])
-- Any existing work is simply passed through
Just (WOWork w) -> do
Conduit.yield (WOWork w)
-- A 'Nothing' means that all the conduits before this one have finished.
-- We now gather all the chunks, concatenate, and return the list of items.
-- The 'takeMVar' ensures that we wait for all the parallel work to complete.
fmap concat $ traverse (liftIO . takeMVar) chunkVars
We now finally have all the pieces to use our parallel streams. In particular, the
sinkItems conduits can be combined to obtain a conduit that fits perfectly in our
yieldAndSink :: [Item] -> ConduitT i (WorkOr o) IO [Item]
yieldAndSink items = yieldParallel 100 items .| sinkItems
-- To remind you:
runConduitWithWork :: Int -> ConduitT () (WorkOr Void) IO r -> IO r
With this, we can write a very convoluted parallel map. The
demo function receives the chunk size that you want and the number of threads to use, and it will run some 'slowComputation' on every item.
demo :: Int -> Int -> IO ()
demo chunkSize numThreads = do
items = ["1", "2", "3", "4", "5", "6", "7", "8", "9"]
slowComputation :: Item -> IO Item
slowComputation item = do
threadDelay 1000000 -- Delay 1 second (pretend that this is some slow computation)
putStr item -- Show when the computation is done
theConduit :: ConduitT i (WorkOr o) IO [Item]
yieldParallel chunkSize items .| parallelMapM slowComputation .| sinkItems
putStrLn $ "Evaluating with chunk size " <> show chunkSize <>
" on " <> show numThreads <> " threads:"
r <- runConduitWithWork numThreads theConduit
putStrLn $ "\nEnd result: " <> show r
Using one thread, all the items are processed sequentially and it takes 9 seconds:
Increasing the number of threads reduces the time spent, but the end result remains the same. Do note that the order in which it prints the results varies, but the final result remains constant:
With a chunk size of 3, each chunk still takes 3 seconds to run. That puts a lower bound on the run time regardless of the number of threads, so to decrease the run time further you need smaller chunks:
In the real world you want the chunks to be big enough that you're not spending all your time in the sequential sections of the code, but small enough to allow a fair distribution of work over the threads.
As shown in this blog, we have designed and implemented a scalable approach to dealing with parallelism.
Instead of working on all items at once, we use parallel streams to reduce memory pressure. This is based on the principle of streaming, where few items are kept in memory at any given moment. All other items are in GC friendly datastructures, allowing us to have huge datasets without any memory problems. Using the parallel streams and parallel work, we can implement a large variety of actions, such as
DEDUPLICATION (which we'll show in Parallel streaming in Haskell: Part 2 - Optimized parallel aggregations).
Actions are defined in a modular fashion. An action implementation doesn't need to know what actions come before or after, they just need to worry about implementing their own logic in terms of manipulating the streams that come in. Many parallel actions are very simple to implement on these conduits with parallel streams.
The approach of using a
Conduit as a dynamic work producer allows us to scale easily. The work is divided into small chunks, allowing us to easily distribute the work over multiple, independent threads. We can consume these streams with any number of threads and can even dynamically adjust the number of running threads during execution. The evaluator does the heavy lifting, pulling on the streams and evaluating the work.
This implementation of parallelism works really well in practice. We had two big, somewhat competing, requirements:
Below we've plotted the run times vs number of threads for five example jobs. All of them follow a neat exponential decay towards a plateau, which is precisely what you want to see in a good implementation of parallellism. The height of the plateau depends on how many parallelizable parts there are and on the overhead of the parallelism implementation. Here we see that Job 2 doesn't parallelize at all, while Job 5 is around 5 times as fast. An important property here is that the run times never increase when we add more threads.
Throughput is the number of jobs that the server can run in a given time period. For most jobs the main bottleneck is the CPU (which is why it makes sense to parallelize), so as long as the addition of threads doesn't increase the CPU time too much we should see similar throughput.
For example, if we compare using 1 CPU for 16 seconds or using 8 CPUs for 2 second, we get the same total of 16 CPU seconds so total throughput on the server remains the same. The latter will have a lower latency so using multithreading is a no-brainer. If the parallel implementation is a bit less efficient and has to use 8 CPUs for 4 seconds (32 CPU seconds total), the additional threads still give a good improvement of latency but you're halving the throughput on the server.
In the plot for actual CPU time spent vs the number of threads, we see that parallel jobs do use a bit more CPU time but it remains quite reasonable. At some point adding more threads isn't worth it anymore, because the run time barely decreases while the CPU time keeps climbing.
The next part in this series is Parallel streaming in Haskell: Part 2 - Optimized parallel aggregations, where we will explain in detail how we implemented parallel aggregations, such as deduplication, sorting and grouping.
1: Our actual AST has more layers than just actions and expressions and more actions than mentioned here. At the moment of writing, we have 29 possible actions, as well as over 70 different expression constructs. ↩
2: Most of the incoming jobs are split up into smaller joblets that can be scheduled individually. The results of these smaller joblets are written to reusable caches, so they can be shared between similar jobs. ↩
3: For a good explanation of why
Void are the right types for the conduit, check the conduit readme. ↩
Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!Apply now