Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use a disruptor with multiple message types

My system has two different types of messages - type A and B. Each message has a different structure - type A contains an int member and type B contains a double member. My system needs to pass both types of messages to numerous business logic threads. Reducing latency is very important so I am investigating using a Disruptor to pass messages from the main thread to the business logic threads in a mechanically sympathetic manner.

My problem is that the disruptor only accepts one type of object in the ring buffer. This makes sense because the disruptor pre-allocates the objects in the ring buffer. However, it also makes it difficult to pass two different types of messages to my business logic threads via the Disruptor. From what I can tell, I have four options:

  1. Configure the disruptor to use objects containing a fixed size byte array (as recommended by How should one use Disruptor (Disruptor Pattern) to build real-world message systems?). In this case, the main thread must encode the messages into byte arrays before publishing them to the disruptor and each of the business logic threads must decode the byte arrays back into objects upon receipt. The downside of this setup is that the business logic threads are not truly sharing the memory from the disruptor - instead they are creating new objects (and thus creating garbage) from the byte array provided by the disruptor. The upside of this setup is that all business logic threads can read multiple different types of messages from the same disruptor.

  2. Configure the disruptor to use a single type of object but create multiple disruptors, one for for each object type. In the case above, there would be two separate disruptors - one for objects of type A and another for objects of type B. The upside of this setup is that the main thread doesn't have to encode the object to a byte array and the business less logic threads can share the same objects as used in the disruptor (no garbage created). The downside of this setup is that somehow each business logic thread will have to subscribe to messages from multiple disruptors.

  3. Configure the disruptor to use a single type of "super" object that contains all fields of both message A and B. This is very against OO style, but will allow for a compromise between option #1 and #2.

  4. Configure the disruptor to use object references. However, in this case I lose the performance benefits of object preallocation and memory ordering.

What do you recommend for this situation? I feel that option #2 is the cleanest solution, but I don't know whether or how consumers can technically subscribe to messages from multiple disruptors. If anyone can provide an example for how to implement option #2, it would be much appreciated!

like image 953
Ben Baumgold Avatar asked May 30 '13 21:05

Ben Baumgold


2 Answers

Configure the disruptor to use objects containing a fixed size byte array (as recommended by How should one use Disruptor (Disruptor Pattern) to build real-world message systems?). In this case, the main thread must encode the messages into byte arrays before publishing them to the disruptor and each of the business logic threads must decode the byte arrays back into objects upon receipt. The downside of this setup is that the business logic threads are not truly sharing the memory from the disruptor - instead they are creating new objects (and thus creating garbage) from the byte array provided by the disruptor. The upside of this setup is that all business logic threads can read multiple different types of messages from the same disruptor.

This would be my preferred approach, but I slightly coloured by our use cases, just about every place that we've used the Disruptor it's either receiving from or sending to some sort of I/O device, so our basic currency is byte arrays. You can get around the object creation by using a flyweight approach to marshalling. To see an example of this, I used Javolution's Struct and Union classes in an example that I presented at Devoxx (https://github.com/mikeb01/ticketing). If you can completely deal with the object before returning from the onEvent call from the event handler then this approach works well. If the event needs to live beyond that point then you need to make some sort of copy of the data, e.g. de-serialising it into an object.

Configure the disruptor to use a single type of object but create multiple disruptors, one for for each object type. In the case above, there would be two separate disruptors - one for objects of type A and another for objects of type B. The upside of this setup is that the main thread doesn't have to encode the object to a byte array and the business less logic threads can share the same objects as used in the disruptor (no garbage created). The downside of this setup is that somehow each business logic thread will have to subscribe to messages from multiple disruptors.

Not tried this approach, you'd probably need a custom EventProcessor that can poll from multiple ring buffers.

Configure the disruptor to use a single type of "super" object that contains all fields of both message A and B. This is very against OO style, but will allow for a compromise between option #1 and #2. Configure the disruptor to use object references. However, in this case I lose the performance benefits of object preallocation and memory ordering.

We've done this in a couple of cases where some cases where lack of preallocation is tolerable. It works okay. If you are passing objects then you need to make sure that you null them out once you are finished with them on the consumer side. We found that using a double dispatch pattern for the "super" object kept the implementation fairly clean. One drawback to this is that it you will get slightly longer GC stalls that with something that was a straight array of objects as the GC has more live objects to traverse during the mark phase.

What do you recommend for this situation? I feel that option #2 is the cleanest solution, but I don't know whether or how consumers can technically subscribe to messages from multiple disruptors. If anyone can provide an example for how to implement option #2, it would be much appreciated!

Another option, if you want complete flexibility with regards to the use of data, is to not use the ring buffer, but instead talk directly to the Sequencer and define your object layout as you best see fitting.

like image 134
gtzinos Avatar answered Oct 20 '22 01:10

gtzinos


Ben Baumgold, I am sure you found a solution by now. Your #4 (or #3) can be implemented trivially by creating an event holder. Think of it as enum for Objects. To speed look-ups, events should be enriched with an enum type. Notice, I am storing a reference to the original event in the holder. It may be more appropriate to create a copy constructor or clone() and copy events on insertion into the ring buffer.

Illustrating by example:

// this is enum used in events

public enum MyEventEnum {
EVENT_TIMER,
EVENT_MARKETDATA;
}

// this is holder. At any time, this instance in ringbuffer holds just one event indexed by array[ type.ordinal() ]. why array should be obvious from the code.

public class RingBufferEventHolder {    
 private MyEventEnum;   
 private EventBase array[];

 public RingBufferEventHolder() {
    array=new EventBase[MyEventEnum.values().length]; 
 }

 // TODO: null the rest
 public void setEvent(EventBase event) {
    type=event.getType();
    switch( event.getType() ) {
        case EVENT_TIMER:
            array[MyEventEnum.EVENT_TIMER.ordinal()]=event;
            break;
        case EVENT_MARKETDATA:
            array[MyEventEnum.EVENT_MARKETDATA.ordinal()]=event;
            break;
        default:
            throw new RuntimeException("Unknown event type " + event );
    }
}

// publish event

   EventBase newEvent=new EventMarketData(....);
   // prepare
   long nextSequence = ringBuffer.next(); 
   RingBufferEventHolder holder = ringBuffer.get(nextSequence);
   holder.setEvent(newEvent);
   // make the event available to EventProcessors 
   ringBuffer.publish(nextSequence);
like image 20
Vortex Avatar answered Oct 20 '22 01:10

Vortex