I am using PostgreSQL as a job queue. Following is my query to retrieve a job and update its state:
UPDATE requests AS re
SET
started_at = NOW(),
finished_at = NULL
FROM (
SELECT
_re.*
FROM requests AS _re
WHERE
_re.state = 'pending'
AND
_re.started_at IS NULL
LIMIT 1
FOR UPDATE SKIP LOCKED
) AS sub
WHERE re.id = sub.id
RETURNING
sub.*
Now, I have several machines, on each machine I have 1 process with several threads, and on each thread I have a worker. All workers in the same process shared a connection pool, typically having 10 - 20 connections.
The problem is, the above query will return some rows more than once!
I cannot find any reasons. Could anyone help?
To be more detailed, I am using Python3 and psycopg2.
Update:
I have tried @a_horse_with_no_name's answer, but seems not work.
I noticed that, one request is retrieved by two queries with the started_at
updated to:
2016-04-21 14:23:06.970897+08
and
2016-04-21 14:23:06.831345+08
which are only differed by 0.14s.
I am wondering if at the time those two connections executes the inner SELECT subquery, both locks are not established yet?
Update:
To be more precise, I have 200 workers (i.e. 200 threads) in 1 process on 1 machine.
Please also note that it's essential that each thread has it's own connection if you do not want them to get in each others way.
If your application uses multiple threads of execution, they cannot share a connection concurrently. You must either explicitly control access to the connection (using mutexes) or use a connection for each thread. If each thread uses its own connection, you will need to use the AT clause to specify which connection the thread will use.
from: http://www.postgresql.org/docs/9.5/static/ecpg-connect.html
All kinds of wierd things happen if two threads share the same connection. I believe this is what is happening in your case. If you take a lock with one connection, all other threads that use the same connection will have access to the locked objects.
Permit me to suggest an alternative approach, that is really simple. The use of redis as a queue. You can either simply make use of redis-py and the lpush/rpop methods or use python-rq.
There is a chance a locking transaction is not yet issued at the time of the select, or the lock is lost by the time the results of the select are ready and the update statement begins. Have you tried explicitly beginning a transaction?
BEGIN;
WITH req AS (
SELECT id
FROM requests AS _re
WHERE _re.state = 'pending' AND _re.started_at IS NULL
LIMIT 1 FOR UPDATE SKIP LOCKED
)
UPDATE requests SET started_at = NOW(), finished_at = NULL
FROM req
WHERE requests.id = req.id;
COMMIT;
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With