Fixing Duplicate Key Errors in Postgres Queue-Like Table

djangopostgresqlpythonqueue

I have a table called queue with items that need to be processed:

CREATE TABLE public.queue (
    id serial NOT NULL
        CONSTRAINT queue_pkey
            PRIMARY KEY
);

Another table process represents processed items from the queue (e.g. a report has been generated). In reality, there are more such tables (there are more processes that need to be performed on an item). There is one-to-one relation between queue and process – each item can be processed just once.

CREATE TABLE public.process (
    id            serial  NOT NULL
        CONSTRAINT process_pkey
            PRIMARY KEY,
    queue_item_id integer NOT NULL
        CONSTRAINT process_queue_item_id_key
            UNIQUE
        CONSTRAINT process_queue_item_id_8953ec7b_fk_datastore
            REFERENCES public.queue
            DEFERRABLE INITIALLY DEFERRED
);

Here are some test data:

BEGIN;
TRUNCATE TABLE queue, process
    RESTART IDENTITY CASCADE;
INSERT INTO queue
SELECT GENERATE_SERIES(1, 10000);
COMMIT;

The worker that processes the items is implemented as follows. The code is in Django (Python framework), but I am convinced that my error is not caused by Django or its ORM.

(For simplicity, there is no termination condition.)

while True:
    with transaction.atomic():
        queue_items = Queue.objects \
                          .filter(process=None) \
                          .order_by() \
                          .select_for_update(skip_locked=True, of=('self',))[:8]

        print('Generating report...')
        time.sleep(0.1)

        Process.objects.bulk_create(
            (Process(queue_item=q)
             for q in queue_items)
        )

Here is a transcript of the SQL queries that travel to the database:

-- while True:

BEGIN;

SELECT queue."id"
FROM queue
         LEFT OUTER JOIN "process"
                         ON (queue."id" = "process"."queue_item_id")
WHERE "process"."id" IS NULL
LIMIT 8 FOR UPDATE OF queue SKIP LOCKED;

-- print('Generating report...')
-- time.sleep(0.5)

INSERT INTO "process" ("queue_item_id")
VALUES (1),
       (2),
       (3),
       (4),
       (5),
       (6),
       (7),
       (8)
RETURNING "process"."id";

COMMIT;

If I start one worker, the queue is processed perfectly fine. If I run two or more workers, I start getting this error:

duplicate key value violates unique constraint "process_queue_item_id_key"
DETAIL:  Key (queue_item_id)=(**) already exists.

How another transaction could create rows in process for items in a queue when those rows are locked?

What I tried:

  1. I tried to rewrite SELECT query with EXISTS:
SELECT "queue"."id"
  FROM "queue"
 WHERE NOT (EXISTS(SELECT U0."id", U0."queue_item_id" FROM "process" U0 WHERE U0."queue_item_id" = "queue"."id"))
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED

without success, the same error occurs.

  1. If I arrange the rows randomly, the error occurs much later (almost at the end of the queue).
SELECT "queue"."id"
  FROM "queue"
  LEFT OUTER JOIN "process"
    ON ("queue"."id" = "process"."queue_item_id")
 WHERE "process"."id" IS NULL
 ORDER BY RANDOM()
 LIMIT 8
   FOR UPDATE OF "queue" SKIP LOCKED
  1. I put a breakpoint in the middle of the transaction and in another transaction I checked that the row locking in my opinion works correctly:
SELECT id
FROM queue
WHERE id NOT IN (
    SELECT id
    FROM queue
        FOR UPDATE SKIP LOCKED
);
  • My Postgres version: PostgreSQL 13.1, compiled by Visual C++ build 1914, 64-bit.
  • Each worker has its own connection to Postgres with default isolation level (read committed).

Best Answer

When a process commits, the locks it held are immediately released to all. But the rows it inserted are not visible to other snapshots which started before it committed. So the just-unlocked rows are eligible for someone else to lock, when that someone else is not able to see the inserted rows that would cause them not to select the just-unlocked row. When you select a row FOR UPDATE, the anticipation is that you will update that row.