If I have a situation like the following:
ObserverA, ObserverB, ObserverC all inherit from AbstractObserver.
I create a list of observers:
List<AbstractObserver> list = new ArrayList<AbstractObserver>();
list.add(new ObserverA());
list.add(new ObserverB());
list.add(new ObserverC());
And some kind of handler with the following methods runs in a "MAIN" thread:
public void eat(Food item) {
for(AbstractObserver o : list) {
o.eatFood(item);
}
}
public void drink(Coffee cup) {
for(AbstractObserver o : list) {
o.drinkCoffee(cup);
}
}
How would I design a system where I could run each eatFood and drinkCoffee method of the observers in different threads? Specifically, how would I run the eatFood or drinkCoffee method in ObserverA, ObserverB, and ObserverC in their own threads when the "MAIN" thread receives an event (drink or eat methods get called)?
I'd like to have different threads for each AbstractObserver subclass instance because, currently, I'm notifying each observer sequentially which could cause delays.
What Is the Observer Pattern? Observer is a behavioral design pattern. It specifies communication between objects: observable and observers. An observable is an object which notifies observers about the changes in its state. For example, a news agency can notify channels when it receives news.
In software design and engineering, the observer pattern is a software design pattern in which an object, named the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.
The observer pattern is generally implemented in a single-application scope. On the other hand, the publisher-subscriber pattern is mostly used as a cross-application pattern (such as how Kafka is used as Heart of event-driven architecture) and is generally used to decouple data/event streams and systems.
In Java, you may have come across classes called observer and observable, which have been commonly used to implement the observer pattern in Java. However, these were deprecated in Java 9 and are no longer recommended for use.
Making some simplifying assumptions here that you don't care about getting notified when the eating/drinking finishes, you could also use the executor framework to throw the work onto a queue:
// declare the work queue
private final Executor workQueue = Executors.newCachedThreadPool();
// when you want to eat, schedule a bunch of 'eating' jobs
public void eat(final Food item){
for (final AbstractObserver o: list) {
workQueue.execute(new Runnable() {
@Override
public void run() {
o.eatFood(item); // runs in background thread
}
});
}
}
On exit to your program, you must shut down the executor:
workQueue.shutdown();
I'm not a pro at this, but perhaps you could use a Producer-consumer set up. Here the producer, which is the observed entity, could add a notification on a queue in its own thread, which the consumer, the observer here, would get from the same queue, but on its own thread.
To elaborate on Hovercraft's answer, a basic implementation of your observer could look like this:
class ObserverA implements Runnable {
private final BlockingQueue<Food> queue = new ArrayBlockingQueue<> ();
public void eatFood(Food f) {
queue.add(f);
}
public void run() {
try {
while (true) {
Food f = queue.take(); //blocks until some food is on the queue
//do what you have to do with that food
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
//and exit
}
}
}
So your code that calls eatFood
would return immediately from that method and not block your main thread.
You obviously need to allocate a thread to the observer, either directly: new Thread(observerA).start();
or through an ExecutorService, which is probably easier and preferable.
Alternatively, you can create the threads at the "observed" object level:
private static final ExecutorService fireObservers = Executors.newFixedThreadPool(10);
public void eat(Food food) {
for (AbstractObserver o : observers) {
//(i) if an observer gets stuck, the others can still make progress
//(ii) if an observer throws an exception, a new thread will be created
Future<?> f = fireObservers.submit(() -> o.dataChanged(food));
fireObservers.submit(new Callable<Void>() {
@Override public Void call() throws Exception {
try {
f.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
logger.warn("Slow observer {} has not processed food {} in one second", o, food);
} catch (ExecutionException e) {
logger.error("Observer " + o + " has thrown exception on food " + food, e.getCause());
}
return null;
}
});
}
}
(I mostly copied pasted from here - you probably need to adapt it to your needs).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With