I'm playing around with the Disruptor framework, and am finding that my event handlers are not being invoked.
Here's my setup code:
private static final int BUFFER_SIZE = 1024 * 8;
private final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();
private void initializeDisruptor() {
if (disruptor != null)
return;
disruptor =
new Disruptor<TwitterStatusReceivedEvent>(TwitterStatusReceivedEvent.EVENT_FACTORY, EXECUTOR,
new SingleThreadedClaimStrategy(BUFFER_SIZE),
new SleepingWaitStrategy());
disruptor.handleEventsWith(searchTermMatchingHandler)
.then(appendStatusHandler,updatePriceHandler).then(persistUpdatesHandler);
this.ringBuffer = disruptor.start();
}
Elsewhere, I publish events. I've tried each of the following two approaches:
Event Publishing Approach A:
private void handleStatus(final Status status)
{
long sequence = ringBuffer.next();
TwitterStatusReceivedEvent event = ringBuffer.get(sequence);
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
ringBuffer.publish(sequence);
}
In this scenario, I find the the first EventHandler
gets invoked, but never anything beyond that.
Event Publishing Approach B:
private void handleStatus(final Status status)
{
disruptor.publishEvent(new EventTranslator<TwitterStatusReceivedEvent>() {
@Override
public TwitterStatusReceivedEvent translateTo(
TwitterStatusReceivedEvent event, long sequence) {
event.setStatus(status);
event.setSearchInstruments(searchInstruments);
return event;
}
});
}
In this scenario, I find that none of the event handlers get invoked at all.
What am I doing wrong?
Here's my EventHandler in it's entirety. How should I be signalling that processing is complete?
public class SearchTermMatchingEventHandler implements EventHandler<TwitterStatusReceivedEvent> {
@Override
public void onEvent(TwitterStatusReceivedEvent event, long sequence,
boolean endOfBatch) throws Exception {
String statusText = event.getStatus().getText();
for (Instrument instrument : event.getSearchInstruments())
{
if (statusText.contains(instrument.getSearchTerm()))
{
event.setMatchedInstrument(instrument);
break;
}
}
}
}
Each event handler needs to run in its own thread which wont exit until you shutdown the disruptor. Since you're using a single threaded executor, only the first event handler that happens to execute will ever run. (The Disruptor class stores each handler in a hashmap so which handler winds up running will vary)
If you switch to a cachedThreadPool you should find it all starts running. You won't need to do any management of the sequence numbers because that's all handled by the EventProcessor that the Disruptor class sets up and manages for you. Just processing each event you get is exactly right.
You need to make sure your searchTermMatchingHandler is updating its sequence number after it processes the event. The EventHandlers further downstream (appendStatusHandler, updatePriceHandler, persistUpdatesHandler) will be inspecting the searchTermMatchingHandler sequence number to see which events they can pick up off the ring buffer.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With