Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can we avoid multiple Rebus messages when its has been timed out?

We are using Rebus as a queue system with Sql server. We have several recipients for different types of messages. Each message can be handled by several workers of a certain type. One message should only be handled/processed by one worker (the first one that pulls it). If a worker for some reason can't finish it, it postpones the message using the timeout service.

If I have understood it correctly, it becomes a TimeoutRequest and put in the timeouts table. When it's time to rerun, it becomes a TimeoutReply before it is reintroduced into the queue as the original message.

The problem we are having is that when it becomes a TimeoutReply, all the workers pick it up and create the original message. One original message becomes several messages (as many as there are workers) when timed out.

Our Rebus setup is the following:

"Server side":

        var adapter = new BuiltinContainerAdapter();
        Configure.With(adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServerInOneWayClientMode(connectionString).EnsureTableIsCreated())
            .CreateBus()
            .Start();

        return adapter;

"Worker side":

        _adapter = new BuiltinContainerAdapter();
        Configure.With(_adapter)
            .Logging(l => l.Log4Net())
            .Transport(t => t.UseSqlServer(_connectionString, _inputQueue, "error")
                .EnsureTableIsCreated())
            .Events(x => x.AfterMessage += ((bus, exception, message) => SendWorkerFinishedJob(exception, message)))
            .Events(x => x.BeforeMessage += (bus, message) => SignalWorkerStartedJob(message))
            .Behavior(x => x.SetMaxRetriesFor<Exception>(0))
            .Timeouts(x => x.StoreInSqlServer(_connectionString, "timeouts").EnsureTableIsCreated())
            .CreateBus().Start(numberOfWorkers);

Any help in solving the problem or to provide understanding is greatly appreciated!

like image 231
jopa Avatar asked Nov 11 '22 06:11

jopa


1 Answers

The only reason I can imagine why you would end up with multiple timeout replies, is because every worker functions as a timeout manager, and they seem to share the same storage.

This way, since the timeout manager does not use any kind of locking or anything when querying for due timeouts, they can end up snatching the same due timeout, which in turn results in multiple timeout replies - iow there's a race condition, but it goes unnoticed because this SQL does not notice whether a row was actually deleted).

I suggest you either a) use separate timeout tables for the workers (e.g. _inputQueue + ".timeouts"), or b) let all the workers use an external timeout manager (i.e. by omitting the Timeouts(x => ...) thingie and start a stand-alone dedicated timeout manager.

In your scenario, I guess (a) is the easiest way to go because it's pretty close to what you have got now.

I do prefer (b) myself though, usually with one timeout manager per machine that's hosting Rebus endpoints.

Please let me know if that solves your problem.

Also, I'm curious to know how the SQL transport is working out for you :)

like image 164
mookid8000 Avatar answered Nov 15 '22 07:11

mookid8000