I'm using Guava's EventBus to kick off some processing and report results. Here's a very simple compilable example:
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
public class Test {
public static class InitiateProcessing { }
public static class ProcessingStarted { }
public static class ProcessingResults { }
public static class ProcessingFinished { }
public static EventBus bus = new EventBus();
@Subscribe
public void receiveStartRequest(InitiateProcessing evt) {
System.out.println("Got processing request - starting processing");
bus.post(new ProcessingStarted());
System.out.println("Generating results");
bus.post(new ProcessingResults());
System.out.println("Generating more results");
bus.post(new ProcessingResults());
bus.post(new ProcessingFinished());
}
@Subscribe
public void processingStarted(ProcessingStarted evt) {
System.out.println("Processing has started");
}
@Subscribe
public void resultsReceived(ProcessingResults evt) {
System.out.println("got results");
}
@Subscribe
public void processingComplete(ProcessingFinished evt) {
System.out.println("Processing has completed");
}
public static void main(String[] args) {
Test t = new Test();
bus.register(t);
bus.post(new InitiateProcessing());
}
}
I use these events as a way for other software components to react in preparation for this processing. For example, they may have to save their current state before processing and restore it after.
I would expect the output of this program to be:
Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed
Instead, the actual output is:
Got processing request - starting processing
Generating results
Generating more results
Processing has started
got results
got results
Processing has completed
The event that is supposed to indicate that processing has started actually happens after the actual processing ("generating results").
After looking at the source code, I understand why it's behaving this way. Here's the relevant source code for the EventBus
.
/**
* Drain the queue of events to be dispatched. As the queue is being drained,
* new events may be posted to the end of the queue.
*/
void dispatchQueuedEvents() {
// don't dispatch if we're already dispatching, that would allow reentrancy
// and out-of-order events. Instead, leave the events to be dispatched
// after the in-progress dispatch is complete.
if (isDispatching.get()) {
return;
}
// dispatch event (omitted)
What's happening is since I'm already dispatching the top level InitiateProcessing
event, the rest of the events just get pushed to the end of the queue. I would like this to behave similar to .NET events, where invoking the event doesn't return until all handlers have completed.
I don't quite understand the reason for this implementation. Sure, the events are guaranteed to be in order, but the order of the surrounding code gets completely distorted.
Is there any way to get the bus to behave as described and produce the desired output? I did read in the Javadocs that
The EventBus guarantees that it will not call a subscriber method from multiple threads simultaneously, unless the method explicitly allows it by bearing the @AllowConcurrentEvents annotation.
But I don't think this applies here - I'm seeing this issue in a single threaded application.
Edit
The cause of the issue here is that I'm post
ing from within a subscriber. Since the event bus is not reentrant, these "sub-posts" get queued up and are handled after the first handler completes. I can comment out the if (isDispatching.get()) { return; }
section in the EventBus
source and everything behaves as I would expect - so the real question is what potential problems have I introduced by doing so? It seems the designers made a conscientious decision to not allow reentrancy.
I know this question is 4 years old, but I just ran into the same problem today. There is a simple (and counter-intuitive) change to get the behavior you want. Per https://stackoverflow.com/a/53136251/1296767, you can use an AsyncEventBus with a DirectExecutor:
public static EventBus bus = new AsyncEventBus(MoreExecutors.newDirectExecutorService());
Running your test code with the above change, the results are exactly what you want:
Got processing request - starting processing
Processing has started
Generating results
got results
Generating more results
got results
Processing has completed
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With