Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

fairly simple time based queue in MongoDB

Tags:

mongodb

nosql

I am trying to implement a fairly simple queue using MongoDB. I have a collection which a number of dumb workers needs to process. Each worker should search the collection for unprocessed work and then execute it.

The way I decide which work is unprocessed is based on a simple calculation.

Basically I have a collection of jobs which need to be performed at specific intervals, where the interval is stored in each document as interval, the worker will scan the collection for documents which have not been updated for at least the interval time.

An example of a document (_id field omitted) is:

{
  updated: 0360,
  interval: 60,
  work: "an object representing the work"
}

What I want is an atomic/blocking query (there are multiple workers) which returns a batch of documents where updated + interval < currentTime, where currentTime is the time on the database server, as well as sets the updated field to currentTime.

In other words:

  1. find: updated + interval < currentTime
  2. return a batch of these, say 30
  3. set: updated = currentTime

Any help is greatly appreciated!

like image 644
lmirosevic Avatar asked May 29 '26 20:05

lmirosevic


1 Answers

Since MongoDB does not support transactions, you can't safely put a pessimistic lock on a batch of items, unless you have a separate document for that -- more on that at the end.

Let's start with the query: You can't query for sth. like 'where x + y < z' in MongoDB. Instead, you'll have to use a field for the next due date, e.g. nextDue:

{
  "nextDue": "420",
  "work": { ... }
}

Now each worker can fetch a couple of items (NOTE: this is all pseudo-code, not a specific programming language):

var result = db.queue.find( { "nextDue": { $gt, startTime } }).limit(50);
// hint: you can do a random skip here to decrease the chances of collisions
// between workers.

foreach(rover in result)
{
    // pessimistic locking: '-1' indicates this is in progress. 
    // I'd recommend a flag instead, however...

    var currentItem = db.queue.findAndModify({ "_id" : rover.id, "nextDue" : {$gt, startTime}}, {$set : {"nextDue" : -1}});

    if(currentItem == null)
        continue; // hit a lock: another worker is processing this already

    // ... process job ...

    db.queue.findAndModify({ "_id" : rover.id, "nextDue" : "-1"}, {$set : {"nextDue" : yourNextDue }});
}

There are essentially two methods I see for the pessimistic locking of multiple documents. One is to create bucket for the documents you're trying to lock, put the job descriptors in the bucket and process those buckets. Since now, the bucket is a single object, you can rely on the atomic modifiers.

The other one is to use two-phase commit, which also creates another object for the transaction, but does not require you to move your documents into a different document. However, this is a somewhat elaborate pattern.

The pseudocode I presented above worked very well in two applications, but in both applications the individual jobs took quite some time to execute (half a second to several hours).

like image 105
mnemosyn Avatar answered May 31 '26 17:05

mnemosyn



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!