PostgreSQL – How to Limit Multiple and Concurrent Executions of a Task

concurrencypostgresqlredis

Let's assume you have a task FOO that can be queued once every minute, and a pool of 50 workers that can be paused. The queue is paused for 10 minutes, and 10 FOO tasks are queued. When the queue is resumed, the 10 FOO tasks will be executed almost concurrently (because there are more workers than tasks).

In this case, I need to ensure that no more than 1 FOO task per minute (time can vary) is performed.

One solution, using Redis, is to take advantage of Redis atomic and the TTL of a key. When a FOO task starts, it checks if the key worker:FOO exists. If does, then it exists, if it does not it sets the value and a TTL to the maximum frequency. This is easy to achieve using SETNX worker:FOO whatever and then using TTL worker:FOO if the previous command returned 1.

Because SETNX is atomic, I won't fall into the case where two FOO tasks are executed because of the race condition between the GET and the SET.

Now the question is: what is the correct way to achieve the same result using PostgreSQL? I can have a table with a key and a executed_on timestamp value, but how can I ensure that there is no case where two FOO tasks are both executed because of the delay between FOO 1 checks the record and writes a lock?

Best Answer

Since you're trying to serialize work, I'd update a record in a table.

CREATE TABLE task_keys (
  task varchar(10) primary key,
  last_executed timestamp with time zone not null,
  by_worker_id integer
);

INSERT INTO task_keys(task, last_executed) 
VALUES ('FOO', '-infinity');

then to see if you can run a task yet:

UPDATE task_keys SET
  last_executed = current_timestamp,
  by_worker = $1
WHERE task = 'FOO'
  AND last_executed < (current_timestamp - INTERVAL '1' MINUTE)
RETURNING *;

The isolation rules of READ COMMITTED guarantee that if this query successfully updates the table and returns a row, no other query can have concurrently done so. A row lock is taken on the relevant task_keys row. If another UPDATE tries to affect the same row, it'll wait until the row lock is released by a commit or rollback of the holding transaction... then it will re-check the WHERE clause. If the other tx committed then the WHERE clause will no longer match, so it'll affect zero rows.

See the documentation on transaction isolation.

If you need concurrency this gets a little tricker. What you really want is a token pool that refills on a timer, where workers can grab tokens from the pool to do work. That's effectively what we're doing here, with one token - so one option is to add more rows for the same task and grab the first task where the last_executed timestamp is old enough.

There are two flaws with this whole approach though:

  • It doesn't know when a task finishes, so long running tasks could overlap; and
  • It doesn't care if a task succeeds or fails

To solve those you need to use a proper work queue implementation. Those are currently very difficult to implement correctly in the database, so I suggest you look at using an external message queue / work queue system to manage them. In PostgreSQL 9.5 the new FOR UPDATE SKIP LOCKED feature will make implementing work queues like this in the database quite simple, though.

BTW, advisory locking is often a good choice for this sort of thing, but it won't help you with the need to expire the lock automatically after a certain amount of time.