Here is pseudo code for what I'm trying to do:
rate_count = SELECT COUNT(id) FROM job WHERE last_processed_at >= ?
current_limit = rate_limit - rate_count
if current_limit > 0
UPDATE job SET state='processing'
WHERE id IN(
SELECT id FROM job
WHERE state='pending'
LIMIT :current_limit
)
I have it working except for concurrency issues. When run from multiple sessions at the same time, both sessions SELECT and therefore update the same stuff :(
I'm able to get the 2nd query atomic by adding FOR UPDATE in it's SELECT subquery. But I can't add FOR UPDATE to the first query because FOR UPDATE isn't allowed with aggregate functions
How can I make this piece an atomic transaction?
You can do FOR UPDATE within a subquery:
rate_count := COUNT(id)
FROM (
SELECT id FROM job
WHERE last_processed_at >= ? FOR UPDATE
) a;
You can also do this whole thing in a single query:
UPDATE job SET state='processing'
WHERE id IN (
SELECT id FROM job
WHERE state='pending'
LIMIT (SELECT GREATEST(0, rate_limit - COUNT(id))
FROM (SELECT id FROM job
WHERE last_processed_at >= ? FOR UPDATE) a
)
)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With