I have a databases table with ~50K rows in it, each row represents a job that need to be done. I have a program that extracts a job from the DB, does the job and puts the result back in the db. (this system is running right now)
Now I want to allow more than one processing task to do jobs but be sure that no task is done twice (as a performance concern not that this will cause other problems). Because the access is by way of a stored procedure, my current though is to replace said stored procedure with something that looks something like this
update tbl
set owner = connection_id()
where available and owner is null limit 1;
select stuff
from tbl
where owner = connection_id();
BTW; worker's tasks might drop there connection between getting a job and submitting the results. Also, I don't expect the DB to even come close to being the bottle neck unless I mess that part up (~5 jobs per minute)
Are there any issues with this? Is there a better way to do this?
Note: the "Database as an IPC anti-pattern" is only slightly apropos here because
A queuing system is composed of producers and consumers. A producer enqueues messages (writes messages to a database) and a consumer dequeues messages (reads messages from the database).
A queue can be the target of a SELECT statement. However, the contents of a queue can only be modified using statements that operate on Service Broker conversations, such as SEND, RECEIVE, and END CONVERSATION. A queue cannot be the target of an INSERT, UPDATE, DELETE, or TRUNCATE statement.
The Message Queue task allows you to use Message Queuing (also known as MSMQ) to send and receive messages between SQL Server Integration Services packages, or to send messages to an application queue that is processed by a custom application.
Q4M (Queue for MySQL) is a message queue licensed under GPL that works as a pluggable storage engine of MySQL, designed to be robust, fast, flexible. It is already in production quality, and is used by several web services (see Users of Q4M).
The best way to implement a job queue in a relational database system is to use SKIP LOCKED
.
SKIP LOCKED
is a lock acquisition option that applies to both read/share (FOR SHARE
) or write/exclusive (FOR UPDATE
) locks and is widely supported nowadays:
Now, consider we have the following post
table:
The status
column is used as an Enum
, having the values of:
PENDING
(0),APPROVED
(1),SPAM
(2).If we have multiple concurrent users trying to moderate the post
records, we need a way to coordinate their efforts to avoid having two moderators review the same post
row.
So, SKIP LOCKED
is exactly what we need. If two concurrent users, Alice and Bob, execute the following SELECT queries which lock the post records exclusively while also adding the SKIP LOCKED
option:
[Alice]:
SELECT
p.id AS id1_0_,1
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
[Bob]:
SELECT
p.id AS id1_0_,
p.body AS body2_0_,
p.status AS status3_0_,
p.title AS title4_0_
FROM
post p
WHERE
p.status = 0
ORDER BY
p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED
We can see that Alice can select the first two entries while Bob selects the next 2 records. Without SKIP LOCKED
, Bob lock acquisition request would block until Alice releases the lock on the first 2 records.
Here's what I've used successfully in the past:
MsgQueue table schema
MsgId identity -- NOT NULL
MsgTypeCode varchar(20) -- NOT NULL
SourceCode varchar(20) -- process inserting the message -- NULLable
State char(1) -- 'N'ew if queued, 'A'(ctive) if processing, 'C'ompleted, default 'N' -- NOT NULL
CreateTime datetime -- default GETDATE() -- NOT NULL
Msg varchar(255) -- NULLable
Your message types are what you'd expect - messages that conform to a contract between the process(es) inserting and the process(es) reading, structured with XML or your other choice of representation (JSON would be handy in some cases, for instance).
Then 0-to-n processes can be inserting, and 0-to-n processes can be reading and processing the messages, Each reading process typically handles a single message type. Multiple instances of a process type can be running for load-balancing.
The reader pulls one message and changes the state to "A"ctive while it works on it. When it's done it changes the state to "C"omplete. It can delete the message or not depending on whether you want to keep the audit trail. Messages of State = 'N' are pulled in MsgType/Timestamp order, so there's an index on MsgType + State + CreateTime.
Variations:
State for "E"rror.
Column for Reader process code.
Timestamps for state transitions.
This has provided a nice, scalable, visible, simple mechanism for doing a number of things like you are describing. If you have a basic understanding of databases, it's pretty foolproof and extensible.
Code from comments:
CREATE PROCEDURE GetMessage @MsgType VARCHAR(8) )
AS
DECLARE @MsgId INT
BEGIN TRAN
SELECT TOP 1 @MsgId = MsgId
FROM MsgQueue
WHERE MessageType = @pMessageType AND State = 'N'
ORDER BY CreateTime
IF @MsgId IS NOT NULL
BEGIN
UPDATE MsgQueue
SET State = 'A'
WHERE MsgId = @MsgId
SELECT MsgId, Msg
FROM MsgQueue
WHERE MsgId = @MsgId
END
ELSE
BEGIN
SELECT MsgId = NULL, Msg = NULL
END
COMMIT TRAN
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With