Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Postgresql FOR UPDATE SKIP LOCKED still selects duplicated rows

I am using PostgreSQL as a job queue. Following is my query to retrieve a job and update its state:

        UPDATE requests AS re
        SET
          started_at = NOW(),
          finished_at = NULL
        FROM (
          SELECT
            _re.*
          FROM requests AS _re
          WHERE
            _re.state = 'pending'
          AND
            _re.started_at IS NULL
          LIMIT 1
          FOR UPDATE SKIP LOCKED
        ) AS sub
        WHERE re.id = sub.id
        RETURNING
          sub.*

Now, I have several machines, on each machine I have 1 process with several threads, and on each thread I have a worker. All workers in the same process shared a connection pool, typically having 10 - 20 connections.

The problem is, the above query will return some rows more than once!

I cannot find any reasons. Could anyone help?

To be more detailed, I am using Python3 and psycopg2.


Update:

I have tried @a_horse_with_no_name's answer, but seems not work.

I noticed that, one request is retrieved by two queries with the started_at updated to:

2016-04-21 14:23:06.970897+08

and

2016-04-21 14:23:06.831345+08

which are only differed by 0.14s.

I am wondering if at the time those two connections executes the inner SELECT subquery, both locks are not established yet?


Update:

To be more precise, I have 200 workers (i.e. 200 threads) in 1 process on 1 machine.

like image 630
HanXu Avatar asked Apr 21 '16 05:04

HanXu


2 Answers

Please also note that it's essential that each thread has it's own connection if you do not want them to get in each others way.

If your application uses multiple threads of execution, they cannot share a connection concurrently. You must either explicitly control access to the connection (using mutexes) or use a connection for each thread. If each thread uses its own connection, you will need to use the AT clause to specify which connection the thread will use.

from: http://www.postgresql.org/docs/9.5/static/ecpg-connect.html

All kinds of wierd things happen if two threads share the same connection. I believe this is what is happening in your case. If you take a lock with one connection, all other threads that use the same connection will have access to the locked objects.

Permit me to suggest an alternative approach, that is really simple. The use of redis as a queue. You can either simply make use of redis-py and the lpush/rpop methods or use python-rq.

like image 95
e4c5 Avatar answered Oct 07 '22 19:10

e4c5


There is a chance a locking transaction is not yet issued at the time of the select, or the lock is lost by the time the results of the select are ready and the update statement begins. Have you tried explicitly beginning a transaction?

BEGIN;
  WITH req AS (
    SELECT id
    FROM requests AS _re
    WHERE _re.state = 'pending' AND _re.started_at IS NULL
    LIMIT 1 FOR UPDATE SKIP LOCKED
    )
  UPDATE requests SET started_at = NOW(), finished_at = NULL
  FROM req
  WHERE requests.id = req.id;
COMMIT;
like image 36
Ezequiel Tolnay Avatar answered Oct 07 '22 19:10

Ezequiel Tolnay