Tech
December 2, 2022
Summary: This blog post describes a high frequency task scheduler which we have developed and the problems it solves, in particular the thundering herd CPU usage peaks.
Most of Channable’s processing revolves around bulk updates to external parties that only happen a few times per day. However, for tasks that need to be executed frequently and quickly, we have developed a separate project. This system handles the synchronization of orders, shipments, returns, stock and price updates. The system runs a large amount of recurring tasks per minute to synchronize data between marketplaces and sellers' e-commerce platforms. This blog post will explain how we created a system in Python to schedule these tasks. The main contribution of this scheduler is that it solves the “thundering herd” problem, where the database server is overloaded for certain periods of time, but underloaded at other times. Moreover, it gives us the possibility to easily scale horizontally, isolate problems per customer/marketplace and allows better monitoring of tasks that are too slow.
We have a standalone project that executes all business logic related to order connections. It handles the following main recurring tasks:
These tasks run continuously at various intervals. While running these tasks, we send a higher number of HTTPS requests per unit of time. The whole project is based on Python’s asyncio library and we use aiohttp for sending HTTPS requests. Therefore, our solution is IO-bound and only uses relatively little CPU on the production server.
The setup we originally had for running these tasks periodically used cron jobs. For example, the orders cron job would collect all active order connections in Channable. For all of these order connections, we would query the marketplace for new orders that didn’t exist in our database yet. Subsequently, we would store the orders in the database and finally send them to the e-commerce platform of the seller.
This orders cron job introduced an imbalance in CPU load on the database, since one cron job was handling all order connections in one process. Every 5 minutes, a peak appeared in the CPU load at the moment when the server would start its cron job to check for and fetch new orders. When finished with this process, the CPU load would be relatively low until the cron job would start again a few minutes later. This phenomenon of uneven spread of load over time is called the Thundering Herd
problem. This causes a suboptimal performance of the tasks that are run, and an unnecessarily overloaded server.
Executing recurring tasks with cron jobs limits the solution to a single python process (per cron job), which is CPU bound. Another limitation of the cron jobs system was that it was not easily possible to monitor which customers or marketplaces were having problems. Moreover, if a task was running too long for one customer, other customers would be affected too. Finally, there was no possibility to prevent tasks from running simultaneously. We need this constraint to prevent fetching shipment information from platforms for orders that we had not yet sent to the platform, in situations where the orders and shipment cron jobs overlapped due to one of them taking too long.
The cron jobs worked well when Channable was still a relatively small company, but at some point it became clear that it was not a long-term solution since it was not scalable due to the above mentioned constraints.
As a result of the growing limitations of using cron jobs, it was clear that we needed a more robust, easy to maintain and performant system for running tasks at scheduled intervals. The solution we aimed for had to fulfill the following requirements:
Two tables track the state of all tasks: task_runs
and task_schedule
. The former is used for tracking the state of every task run, and the latter is used for tracking the schedule of each project. A record gets inserted into task_runs
when a task is started and the finished
column is updated when the task finishes. The task
column references to a row in the task schedule.
task_runs
)
In task_schedule
, for each project and task type, a row is stored that specifies when the next run should be started (at the earliest). Typically, not all tasks are needed for a project, because they are only used for channel-specific features. For those tasks, the active
column is false
.
task_schedule
)
An important property of the task runners is that they should not take on the same tasks. Therefore, a task has to be assigned to a task runner as soon as the runner starts executing the task. We decided to use session bound advisory locks to implement this logic. This has the advantages that there is no need to keep a transaction open while executing a task and when the worker dies, locks are automatically released. Keeping a transaction open to hold the lock for the entire duration of a task is not an option because it would require a massive number of connections to PostgreSQL. As mentioned above, the workload of a task is almost exclusively waiting for IO, which allows us to run a large number of tasks concurrently on a small number of CPUs. A task can take an arbitrary amount of time depending on the size of a project, and which API the task is interacting with. Moreover, due to customer growth and the addition of new features, we are running an increasing number of tasks over time, so keeping a transaction open would not be scalable.
The advisory locks have the downside that the number of locks is limited, but that is not a problem for us. Additionally, we need to make sure that the lock numbers are only used for the task types in the task schedule.
Tasks that are to be executed are determined by a single PostgreSQL query. The inner part of that query is shown in the next code block, the full query is written in the second code block. The inner part will fetch rows from the schedule that have no advisory lock. It will exclude tasks that belong to inactive projects.
SELECT
ts2.class_id,
ts2.project_id,
MIN(ts2.id) AS id
FROM task_schedule ts2
JOIN projects p on ts2.project_id = p.id
WHERE ts2.next_run_due_at < NOW()
AND ts2.active = TRUE
AND p.deployed = TRUE
AND (ts2.class_id, ts2.project_id) NOT IN (
SELECT classid, objid
FROM pg_locks
WHERE locktype = 'advisory'
)
GROUP BY (ts2.class_id, ts2.project_id)
ORDER BY (ts2.class_id, ts2.project_id)
LIMIT 45
The full query uses the subquery above in an INNER JOIN on the task schedule table. The outer query SELECTS rows from the task schedule for which it can obtain an advisory lock on the task type and project id. Apart from the task id, project id, task type and class id, it will return the deadline for the next run. This deadline is calculated using cool_down_minutes
including a random bias between [-4, 4] seconds per minute of cooldown.
SELECT ts.id,
ts.project_id,
ts.task_type,
ts.class_id,
NOW()
+ INTERVAL '1 minute' * ts.cool_down_minutes
- (RANDOM() * 8 - 4) * INTERVAL '1 second' * ts.cool_down_minutes
AS deadline
FROM task_schedule ts
JOIN (
SELECT ts2.class_id,
ts2.project_id,
MIN(ts2.id) AS id
FROM rtask_schedule ts2
JOIN projects p on ts2.project_id = p.id
WHERE ts2.next_run_due_at < NOW()
AND ts2.active = TRUE
AND p.deployed = TRUE
AND (ts2.class_id, ts2.project_id) NOT IN (
SELECT classid, objid
FROM pg_locks
WHERE locktype = 'advisory'
)
GROUP BY (ts2.class_id, ts2.project_id)
ORDER BY (ts2.class_id, ts2.project_id)
LIMIT 45
) AS runnable ON runnable.id = ts.id
WHERE pg_try_advisory_lock(runnable.class_id, runnable.project_id::int);
The scheduling system solely consists of 8 task runners that each fetch tasks to pick up on their own. If one of the runners is shut down, the other task runners will continue operating.
Since we intended to create a robust solution, it is wise to analyze what happens if things go sideways. For example, if a worker dies, the session is closed, tasks are unfinished, but locks are released so a next worker can pick them up. If a task takes a long time, only that project will be affected. Finally, if the query to fetch work from the task schedule fails, all task runners will instantly fail and nothing is incorrectly processed.
The main result is shown in figure 4. There is no longer any thundering herd problem, which was the case in figure 1.
The overall CPU usage is not only more balanced over time, but is also lower. We presume this is because there is less contention on the database as a result of the lack of a thundering herd.
The task scheduler described in this blog post, aimed to solve the “thundering herd” problem, where the database server is overloaded for certain periods of time, but underloaded at other time intervals. The system of task runners we created not only solves this, but is also easy to use, supports PostgreSQL and asyncio, is statically typed and lets us scale the number of task runners easily. Moreover, it is able to prevent certain task types to run at the same time and isolates problems per customer/marketplace and allows better monitoring of tasks that are too slow. This leads to a system that is easy to maintain, robust and performant.
Are you interested in working at Channable? Check out our vacancy page to see if we have an open position that suits you!
Apply now