Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

NServiceBus Handle Messages as a Batch

I'm finding common pattern emerging in backend message processing:

ServiceA generates a large number of messages.

ServiceB processes one message at-a-time.

ServiceC issues a call to a database or web service call that earns substantial performance and reliability gains from calling in a batch.

In some cases it's not feasible to pre-batch the messages from ServiceA or to process messages in a batch in ServiceB, so the preference would be to process all messages individually until the final call at ServiceC. This requires a batching step prior to the ServiceC call.

The seeming ideal would be to have an NServiceBus handler signature that optionally delivers messages in a batch such as:

public void Handle(FooMessage[] messageBatch)
{
}

where none of the messages in the messageBatch are Committed until the handler executes.

This doesn't appear to be natively supported in NServiceBus. I can handle message-at-a-time off the queue and write to memory until a batch flush. But in this case the messages are committed prior to the flush and we don't retain the delivery guarantee for all messages in the batch if the process crashes.

So the question is: Is this a bad pattern for some reason I'm not thinking of? I understand there is an inherent issue with knowing when to flush the batch but it appears that at least some of the transport implementations buffer messages in a batch already under-the-hood and simply deliver one-at-a-time. Batching at this level or putting a simple timeout for periodic flushing seems like it would work.

Is there work-around or a preferred pattern I'm missing?

like image 833
silijon Avatar asked Dec 15 '15 18:12

silijon


1 Answers

Upfront/Disclaimer: I work for Particular Software, the makers of NServiceBus. I also wrote Learning NServiceBus.

History

Before I worked for Particular, I once found myself in your exact situation. I had an analytics type of situation, where 12 web servers were sending the same type of command over MSMQ to indicate an article was viewed. These counts needed to be tracked in a database so that "most popular" lists could be generated based on number of views. But an insert from every single page view does NOT perform well, so I introduced the service bus.

The inserter could have gotten benefit out of inserting up to 50-100 at a time using a table-valued parameter, but NServiceBus only gives you one message at a time, within a transaction.

Why not use a Saga?

In NServiceBus anything that operates on multiple messages generally will need to use a Saga. (A Saga is basically a bunch of related message handlers that keeps some stored state between processing each message.)

But the Saga has to store its data somewhere, and that generally means a database. So let's compare:

  • With NServiceBus now, 50 messages in would mean 50 database inserts.
  • With a hypothetical batch receive, 50 messages in would mean 1 database batch insert.
  • With Sagas, 50 messages in means 50 reads of Saga data + 50 updates of Saga data, and THEN a single database batch insert.

So a Saga makes the "persistence load" much worse.

Of course, you could elect to use in-memory persistence for the Saga. This would give you batching without additional persistence overhead, but if the Saga endpoint crashes, you could lose a partial batch. So if you aren't comfortable losing data, that's not an option.

What would batch receive look like?

So even years ago, I had visualized something like this:

// Not a real NServiceBus thing! Only exists in my imagination!
public interface IHandleMessageBatches<TMessage>
{
    void Handle(TMessage[] messages);
    int MaxBatchSize { get; }
}

The idea would be that if the message transport could peek ahead and see many messages available, it could begin receiving up to the MaxBatchSize and you'd get them all at once. Of course, if only 1 message was in the queue, you'd get an array with 1 message.

Problems

I sat down with the NServiceBus codebase a few years ago thinking I would try to implement this. Well, I failed. At the time, even though MSMQ was the only transport (in NServiceBus V3) the API was architected such that the transport code peeked at the queue and pulled out one message at a time, raising an in-memory event for the message handling logic to kick in. It would have been impossible to change that without massive breaking changes.

The code in more recent versions is much more modular, based in large part because multiple message transports are now supported. However, there is still an assumption of dealing with one message at a time.

The current implementation going into V6 is in the IPushMessages interface. In the Initialize method, the Core pushes a Func<PushContext, Task> pipe into the transport's implementation of IPushMessages.

Or in English, "Hey Transport, when you have a message available, execute this to hand it over to the Core and we'll take it from there."

In short, this is because NServiceBus is geared toward the reliable processing of one message at a time. From a more detailed perspective, there are many reasons why batching receives would prove difficult:

  • When transactions are in play, receiving a batch requires processing all of the messages within that transaction. This can easily get out of hand if the transaction grows too large.
  • Message types may be mixed within a queue. Message type, after all, is only a header. There is no way to say "Give me a batch of messages of type T." What if you receive a batch and it contains other message types?
  • It's possible for multiple handlers to run on the same message type. For instance, if a message SuperMessage inherits BaseMessage, handlers for both types can run on the same message. This possibility for multiple handlers and polymorphic message handlers gets very complex when considering a batch of messages.
  • More on polymorphic messages, what if the batch is Handle(BaseMessage[] batch) but the messages coming in are different supertypes that all inherit from BaseMessage?
  • Many other things, I'm sure, that I have not even thought of.

All told, changing NServiceBus to accept batches would require the entire pipeline to be optimized for batches. Single messages (the current norm) would be a specialized batch where the array size was 1.

So essentially, this would be far too risky a change for the somewhat limited business value it would provide.

Recommendations

What I found was that doing a single insert per message was not as expensive as I thought. What is bad is for multiple threads on multiple web servers to try to write to the database at once and to be stuck in that RPC operation until it's complete.

When these actions are serialized to a queue, and a limited, set number of threads process those messages and do database inserts at a rate the database can handle, things tend to run quite smoothly, most of the time.

Also, think carefully about what you do in the database. An update on an existing row is a lot cheaper than an insert. In my case, I really only cared about counts and didn't need a record for each individual page view. So, it was cheaper to update a record based on content id and 5-minute time window, and update that record's read count, rather than inserting a record per read and forcing myself into a lot of aggregate queries down the line.

If this absolutely will not work, you need to think about what tradeoffs you can make in reliability. You could use a Saga with in-memory persistence, but then you can (and most likely will eventually) lose entire batches. That very well might be acceptable, depending on your use case.

You could also use message handlers to write to Redis, which would be cheaper than a database, and then have a Saga that acts more like a scheduler to migrate that data in batches to a database. You could probably do similar things with Kafka or a bunch of other technologies. In those cases it would be up to you to decide what kind of reliability guarantees you need and set up the tools that can deliver on that.

like image 189
David Boike Avatar answered Sep 25 '22 20:09

David Boike