Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent Observer Pattern

If I have a situation like the following:

ObserverA, ObserverB, ObserverC all inherit from AbstractObserver.

I create a list of observers:

List<AbstractObserver> list = new ArrayList<AbstractObserver>();
list.add(new ObserverA());
list.add(new ObserverB());
list.add(new ObserverC());

And some kind of handler with the following methods runs in a "MAIN" thread:

public void eat(Food item) {
     for(AbstractObserver o : list) {
          o.eatFood(item);
     }
}

public void drink(Coffee cup) {
     for(AbstractObserver o : list) {
          o.drinkCoffee(cup);
     }
}

How would I design a system where I could run each eatFood and drinkCoffee method of the observers in different threads? Specifically, how would I run the eatFood or drinkCoffee method in ObserverA, ObserverB, and ObserverC in their own threads when the "MAIN" thread receives an event (drink or eat methods get called)?

I'd like to have different threads for each AbstractObserver subclass instance because, currently, I'm notifying each observer sequentially which could cause delays.

like image 894
sneeze1 Avatar asked Jan 16 '14 22:01

sneeze1


People also ask

What is an Observer pattern give an example?

What Is the Observer Pattern? Observer is a behavioral design pattern. It specifies communication between objects: observable and observers. An observable is an object which notifies observers about the changes in its state. For example, a news agency can notify channels when it receives news.

What is meant by Observer pattern?

In software design and engineering, the observer pattern is a software design pattern in which an object, named the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

Is Kafka based on Observer pattern?

The observer pattern is generally implemented in a single-application scope. On the other hand, the publisher-subscriber pattern is mostly used as a cross-application pattern (such as how Kafka is used as Heart of event-driven architecture) and is generally used to decouple data/event streams and systems.

Is Observer pattern obsolete?

In Java, you may have come across classes called observer and observable, which have been commonly used to implement the observer pattern in Java. However, these were deprecated in Java 9 and are no longer recommended for use.


3 Answers

Making some simplifying assumptions here that you don't care about getting notified when the eating/drinking finishes, you could also use the executor framework to throw the work onto a queue:

  // declare the work queue
   private final Executor workQueue = Executors.newCachedThreadPool(); 




   // when you want to eat, schedule a bunch of 'eating' jobs
       public void eat(final Food item){
          for (final AbstractObserver o: list) {
             workQueue.execute(new Runnable() {

                @Override
                public void run() {
                   o.eatFood(item); // runs in background thread
                }
             });
          }
       }

On exit to your program, you must shut down the executor:

   workQueue.shutdown();
like image 120
JVMATL Avatar answered Sep 16 '22 21:09

JVMATL


I'm not a pro at this, but perhaps you could use a Producer-consumer set up. Here the producer, which is the observed entity, could add a notification on a queue in its own thread, which the consumer, the observer here, would get from the same queue, but on its own thread.

like image 27
Hovercraft Full Of Eels Avatar answered Sep 18 '22 21:09

Hovercraft Full Of Eels


To elaborate on Hovercraft's answer, a basic implementation of your observer could look like this:

class ObserverA implements Runnable {
    private final BlockingQueue<Food> queue = new ArrayBlockingQueue<> ();

    public void eatFood(Food f) {
        queue.add(f);
    }

    public void run() {
        try {
            while (true) {
                Food f = queue.take(); //blocks until some food is on the queue
                //do what you have to do with that food
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            //and exit
        }
    }
}

So your code that calls eatFood would return immediately from that method and not block your main thread.

You obviously need to allocate a thread to the observer, either directly: new Thread(observerA).start(); or through an ExecutorService, which is probably easier and preferable.


Alternatively, you can create the threads at the "observed" object level:

private static final ExecutorService fireObservers = Executors.newFixedThreadPool(10);

public void eat(Food food) {
    for (AbstractObserver o : observers) {
        //(i)  if an observer gets stuck, the others can still make progress
        //(ii) if an observer throws an exception, a new thread will be created
        Future<?> f = fireObservers.submit(() -> o.dataChanged(food));
        fireObservers.submit(new Callable<Void>() {
            @Override public Void call() throws Exception {
                try {
                    f.get(1, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                    logger.warn("Slow observer {} has not processed food {} in one second", o, food);
                } catch (ExecutionException e) {
                    logger.error("Observer " + o + " has thrown exception on food " + food, e.getCause());
                }
                return null;
            }
        });
    }
}

(I mostly copied pasted from here - you probably need to adapt it to your needs).

like image 25
assylias Avatar answered Sep 18 '22 21:09

assylias