Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Disruptor - EventHandlers not invoked

I'm playing around with the Disruptor framework, and am finding that my event handlers are not being invoked.

Here's my setup code:

private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService  EXECUTOR = Executors.newSingleThreadExecutor();

private void initializeDisruptor() {
    if (disruptor != null)
        return;

    disruptor = 
            new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
                    new SingleThreadedClaimStrategy(BUFFER_SIZE),
                    new SleepingWaitStrategy());
    disruptor.handleEventsWith(searchTermMatchingHandler)
        .then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);

    this.ringBuffer = disruptor.start();
}

Elsewhere, I publish events. I've tried each of the following two approaches:

Event Publishing Approach A:

private void handleStatus(final Status status)
{

    long sequence = ringBuffer.next();
    TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
    event.setStatus(status);
    event.setSearchInstruments(searchInstruments);
    ringBuffer.publish(sequence);
}

In this scenario, I find the the first EventHandler gets invoked, but never anything beyond that.

Event Publishing Approach B:

private void handleStatus(final Status status)
{
    disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {

        @Override
        public TwitterStatusReceivedEvent translateTo(
                TwitterStatusReceivedEvent event, long sequence) {
            event.setStatus(status);
            event.setSearchInstruments(searchInstruments);
            return event;
        }
    });
}

In this scenario, I find that none of the event handlers get invoked at all.

What am I doing wrong?

Update

Here's my EventHandler in it's entirety. How should I be signalling that processing is complete?

public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {

    @Override
    public void onEvent(TwitterStatusReceivedEvent event, long sequence,
            boolean endOfBatch) throws Exception {
        String statusText = event.getStatus().getText();
        for (Instrument instrument : event.getSearchInstruments())
        {
            if (statusText.contains(instrument.getSearchTerm()))
            {
                event.setMatchedInstrument(instrument);
                break;
            }
        }
    }

}
like image 239
Marty Pitt Avatar asked Nov 29 '11 11:11

Marty Pitt


2 Answers

Each event handler needs to run in its own thread which wont exit until you shutdown the disruptor. Since you're using a single threaded executor, only the first event handler that happens to execute will ever run. (The Disruptor class stores each handler in a hashmap so which handler winds up running will vary)

If you switch to a cachedThreadPool you should find it all starts running. You won't need to do any management of the sequence numbers because that's all handled by the EventProcessor that the Disruptor class sets up and manages for you. Just processing each event you get is exactly right.

like image 127
ajsutton Avatar answered Nov 13 '22 09:11

ajsutton


You need to make sure your searchTermMatchingHandler is updating its sequence number after it processes the event. The EventHandlers further downstream (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) will be inspecting the searchTermMatchingHandler sequence number to see which events they can pick up off the ring buffer.

like image 42
Trisha Avatar answered Nov 13 '22 09:11

Trisha