Tech
July 30, 2025
We are happy to announce the open-source release of opsqueue
, our opinionated queueing system!
Why would you want to use it?
opsqueue
is a good choice if you have a use case where you first generate a few million operations (an "operation" is any task that can be executed within a few seconds) and then later execute those operations. In our case, and we expect many other use-cases as well, the systems which generate the operations are often different from the systems that execute them. We usually want to have specialized consumers that maximize throughput (within certain constraints, like e.g. rate-limits) by processing operations in parallel.
Let's take a look at how it works.
We saw the following pattern in our code over and over again:
There are several reasons why service A often doesn't apply the operations itself, and instead offloads them to another service. Usually service A is either not scalable enough, or it is not specialized to do the necessary task, like e.g. calling an LLM service, editing an image, or talking to an external API with strict rate-limits.
By offloading these tasks to more specialized services we can optimize them for each specific task.
We call services that place input into the queue 'Producers', and services that take work from the input queue and write results back to the output queue 'Consumers'.
The description of the transformation to do for a particular element we call an 'Operation'. This word is chosen rather than a more general term like 'task' because individual operations ought to take milliseconds to a couple of seconds at most.
opsqueue
is thus a queue for operations.
You can consider this pattern, which we've coined 'generate-execute', a specialized instance of the producer-consumer pattern[1].
You can also look at it as 'MapReduce and without the reduce', or indeed as 'just' a (distributed) parallel map.
Obviously there are cases in which this distributed batch processing pattern doesn't apply: If your collection of data is small enough and the transformation on your data is not heavy enough to warrant running it on an external system, you could use a normal mapping operation, or
you could implement parallel streaming instead.
Also, if you do not care about the order between items, there are many existing queueing systems that can help you out. One example would be message brokers like RabbitMQ (or hosted versions like Google's PubSub or Amazon's SQS),
another, if you want the same streams to be handled in different ways by multiple services, would be distributed stream-processing platforms like Apache Kafka.
However, we have the following requirements that such systems do not meet:
We deem it likely that these requirements are similar in many other software projects outside of Channable as well.
We also had the following constraints and desires:
Especially the ability to let a Consumer decide on the priority between operations is a killer feature for us. We'll revisit it in detail in the 'Consumer Stragegies' section below.
In the past, we have used the following ad-hoc implementations in different places:
Queues based on Redis' list datastructure.
This is not recommended, because Redis lists do not model queues, meaning you need to build your own bespoke queueing abstractions on top, which is easy to get wrong. Besides this, while Redis shines when it is used in its normal ephemeral mode, running it in its alternative persistent mode requires extra low-level interaction with its replicas.
Queues based on Postgres database tables.
This works reasonably well, especially with the newer SELECT FOR UPDATE ... SKIP LOCKED
functionality.
However, it is easy to hold it wrong, in which case you will either get race conditions, deadlocks, slow performance or all of the above.
It also doesn't allow us to connect many consumers at the same time, since Postgres database connections are quite heavyweight with regards to both memory and runtime, in big part because of the requirements of Multiversion Concurrency Control.
(Postgres) Database tables for metadata, combined with Object Storage for the actual data.
While keeping most of the drawbacks of using Postgres on its own, this worked remarkably well to reduce the pressure on the database and therefore improve throughput. In the design of the principled system, we copied this idea over.
We saw that we were implementing ad-hoc queue-like systems and producer/consumer logic multiple times, and therefore we decided to create "One Systematic Implementation To Rule Them All".
Does this hit the there are 15 standards problem? We think not, as we will explain in more detail below!
What we realized, is that we wanted to be able to have something that was as simple to use as SQLite or RabbitMQ, but then focused on batch-processing. By keeping it focused on a single task and storing its state locally, it would be easy to start, stop, develop with, and deploy.
It would also mean that it would be trivial to run many small individual queues in production. This is great for fault-tolerance as well as for performing upgrades one-by-one, without any other part of the software experiencing any downtime in such cases.
An extra added benefit is how easy it makes testing: You can spin up many independent opsqueues in parallel in your test suite within milliseconds. And indeed, opsqueue's own test suite works this way.
This was a big reason for us to choose the particular tech stack we ended up on: A single small Rust binary with an embedded SQLite database for storage of metadata with storage of the actual data being offloaded to object storage. Where desired, we rely on the excellent Litestream project to have a continuous backup of the current state of the queue.
We mentioned before that we want the Consumer to decide in what order to pick up work. Why is this important?
We posit that most existing queueing systems are making a subpar choice by requiring a Producer system to decide early. The Consumer will always be in a much better position to decide which operation should be picked up next because it has more information available. We do allow producers to pass along extra metadata, so a producer could still say "the priority is 5" or "this work is for user 1234". But the final decision is up to the consumer.
This makes the system very flexible. For example, we can trivially ensure certain operations only execute in a certain datacenter, optimize for maximal paralellism, ensure fairness between user-accounts, or a balanced combination of multiple desires. In short: it's a very flexible mechanism that future-proofs us for use cases that we might not even know about yet.
One important question when designing a queueing system is: where do you store all the queued data?
We made a pragmatic decision here to use object storage as the backing store. We ourselves use Google Cloud Storage (GCS) in production, but Opsqueue is compatible with many others such as Amazon S3, Azure Blob Storage, and for local development/testing simple folders are supported as well.
By only keeping the metadata of each submission in the internal SQLite database and offloading the data to object storage, we make sure that both database size and the duration of network requests to the opsqueue program are kept small.
To further reduce overhead, rather than keeping track of the progress of individual operations, the system only keeps track of chunks of them. The Producer decides on the chunk size, with the goal always being that a chunk should take on the order of a few seconds to complete. This reduces the amount of coordination that needs to happen between the queue and producers/consumers, and adds another knob you can tune to your particular use-case: A large chunk size means less communication overhead whereas a smaller chunk size means more potential for parallelism inside a submission.
The goal is always that the time to communicate with the queue should be insignificant compared to the time taken per operation, and a larger chunk size lets you spread out this overhead over more operations. But flipping that around: if individual operations take longer, you can get away with a smaller chunk size, which means that you can still get the benefits of higher parallelism and quick recovery from failures.
The nice thing of tuning the chunk size, is that regardless of whether individual operations for a particular use-case on average take a millisecond or a second, the average time to process a chunk can be kept more-or-less constant at this 'a few seconds' rate.
These two tricks are the main way we can support a scale of billions of operations and hundreds of connected consumers. And if a performance ceiling is ever hit, it will also be trivial to introduce sharding on top.
We have also integrated OpenTelemetry Tracing support into Opsqueue, to make it easier to understand what is going on in a production system. A trace is threaded all the way from the producer Python code through the queue and consumers working on a submission's chunks. As you can see in the following example, where two consumers are connected to a queue, the overhead of talking with the queue itself is sub-millisecond [3].
We have also implemented some other tricks to further improve Opsqueue's resilience and scalability[4]. In the repository you can find more details, and who knows, they might make an interesting subject for a future blog as well!
We implemented all core components of Opsqueue in Rust.
We chose this for two reasons:
opsqueue
program as a small, self-contained binary.Currently, the main language we have client support for is Python (Besides Rust itself of course). Writing a Producer or Consumer system looks just like writing any other normal Python code. You can find a simple example in the README. But behind the scenes virtually all logic (streaming data to/from object storage, gracefully retrying on connection failure, etc.) is implemented in Rust and exposed through FFI bindings[5]. This means that for any important logic, there only is one implementation that does not need to deal with any language-specific quirks. It also means that adding more language client libraries is a project that would take closer to an afternoon than a week.
Opsqueue has seen production usage for nearly half a year now, with great results! It is chugging along nicely. One queue running in production has finished more than 100.000 submissions and is currently operating at a rate of ±1.000.000 operations per hour. The overhead of talking with this queue service itself is still in the sub-millisecond range[3].
That said, we have not yet replaced all existing other 'generate-execute' implementations we have with it, and are not planning to do so in the short term.
You need to be very careful with rewrites, though sometimes it is the right choice.
In this case: any existing queues that are working well don't need to change. But if they do require maintenance in the future, or are hitting a clear scalability ceiling,
then switching over to the new system is certainly the first thing we will consider!
Opsqueue is out now. Please take it for a spin and let us know how it goes!
1: What is 'specialized' is that in the normal producer-consumer, communication happens in one direction, whereas in 'generate-execute' the Producer is interested in the outcome of the Consumer(s) as well, to check for errors and/or do further processing. ↩
2: If you want to only fail individual items, you can model this by turning failure into part of your output datastructure. That is, rather than processing each operation with a function Input -> Output
, use a function Input -> Result<Output, Error>
instead. ↩
3: Not counting network overhead or the overhead of reading/writing chunks of operations to GCS, S3 or other Object Storage. The example trace shows the system running in a development environment, but similar metrics are observed in production. You can reproduce the trace locally by running the /libs/python/examples/tracing
example yourself. ↩
4: Some examples are improving the performance of the consumer strategies by moving as much logic to the creation of a submission, including using fibonacci hashing for the 'random' strategy, the usage of Snowflake IDs to reduce write amplification, and the usage of websockets between consumers and the queue to detect network failure early. ↩
5: For the Python FFI bindings, we rely on the excellent PyO3 framework. ↩
Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!
Apply now