Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consumer(s)-Producer issue in webserver streaming an array of data

Producer-Consumer blog post states that:

"2) Producer doesn't need to know about who is consumer or how many consumers are there. Same is true with Consumer."

My problem is that I have an array of data that I need to get from the Webserver to clients as soon as possible. The clients can appear mid-calculation. Multiple clients at different times can request the array of data. Once the calculation is complete it is cached and then it can simply be read.

Exmaple Use Case: While the calculation is occurring I want to serve each and every datum of the array as soon as possible. I can't use a BlockingQueue because say if a second client starts to request the array while the first one has already used .take() on the first half of the array. Then the second client missed half the data! I need a BlockingQueue where you don't have to take(), but you could instead just read(int index).

Solution? I have a good amount of writes on my array, so I wouldn't want to use CopyOnWriteArrayList? The Vector class should work but would be inefficient? Is it preferable to use a ThreadSafeList like this and just add a waitForElement() function? I just don't want to reinvent the wheel and I prefer crowd tested solutions for multi-threaded problems...

like image 463
smuggledPancakes Avatar asked Oct 23 '15 21:10

smuggledPancakes


1 Answers

As far as I understand you need to broadcast data to subscribers/clients. Here are some ways that I know for approaching it.

  • Pure Java solution, every client has a BlockingQueue and every time you broadcast a message you put it every queue.

    for(BlockingQueue client: clients){
      client.put(msg);
    }
    
  • RxJava provides a reactive approach. Clients will be subscribers and ever time you emit a message, subscribers will be notified and they can choose to cancel their subscription

    Observable<String> observable = Observable.create(sub->{
        String[] msgs = {"msg1","msg2","msg3"};
        for (String msg : msgs) {
            if(!sub.isUnsubscribed()){
                sub.onNext(msg);
            }
        }
        if (!sub.isUnsubscribed()) { // completes
            sub.onCompleted();
        }
    
    });
    

    Now multiple subscribers can choose to receive messages.

    observable.subscribe(System.out::println);
    observable.subscribe(System.out::println);
    

    Observables are a bit functional, they can choose what they need.

    observable.filter(msg-> msg.equals("msg2")).map(String::length)
       .subscribe(msgLength->{
        System.out.println(msgLength); // or do something useful
    });
    
  • Akka provides broadcast routers

like image 133
Sleiman Jneidi Avatar answered Sep 19 '22 21:09

Sleiman Jneidi