Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use custom class as Observable and fire method in time interval

I have a class Producer, simplifying it has method public Object readData() I want to make this class as Observable (RxJava).

How to indicate which method should be called? Do I need transform my Producer class into Future or Iterable?

The next problem is that the readData should be call every n seconds. Some methods, for instance from, has scheduler parameter but I can not find any example how to apply it. I found interval method, but it emits a sequence of integers. So far, without Observable I use Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)

like image 623
LancerX Avatar asked Dec 13 '13 11:12

LancerX


2 Answers

You can still use the interval() method, just ignore its result!

final Producer producer = ...;
int n = ...;
Observable<Object> obs = 
    Observable.interval(n,TimeUnit.SECONDS)
              .map(new Func1<Integer,Object>() {
                  @Override
                  public Object call(Integer i) {
                      return producer.readData();
                  }
               ));
like image 159
Dave Moten Avatar answered Oct 06 '22 01:10

Dave Moten


I am new to RxJava, but nobody else has answered your question so here is a stab at it.

My suggestion is for your Producer class to implement the interface Observable.OnSubscribeFunc (Javadoc). In the method public Subscription onSubscribe(Observer<T> observer), you can call your Producer#readData() method to get what data is currently available and pass that data into the onNext() method, like this:

// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
    . . . 

    @Override
    public Subscription onSubscribe(Observer<T> observer) {
        while (this.hasData()) {
            observer.onNext(this.readData());
        observer.onCompleted();
    }
}

(This assumes that your Producer#readData() method returns data in chunks rather than all at once. Tweak as needed.)

You can then subscribe to your Producer object using Observable#create(OnSubscribeFunc<T>) method, and also arrange to have your Observable return elements on a timer:

// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));

ScheduledExecutorService scheduler = 
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed

I can't test those last two lines right now. Sorry, I'll look into that and update my answer if it changes things.

DISCLAIMER: I am an RxJava n00b, so this solution might suck or might be needlessly messy. Fix what seems wrong.

UPDATE: I had a problem (RxJava -- Terminating Infinite Streams) with RxJava as well; looks like my solution will look a lot like yours (using the Scheduler to make the Observable return elements periodically).

like image 28
MonkeyWithDarts Avatar answered Oct 05 '22 23:10

MonkeyWithDarts