I have a class Producer
, simplifying it has method public Object readData()
I want to make this class as Observable
(RxJava).
How to indicate which method should be called? Do I need transform my Producer
class into Future
or Iterable
?
The next problem is that the readData
should be call every n seconds.
Some methods, for instance from, has scheduler parameter but I can not find any example how to apply it.
I found interval method, but it emits a sequence of integers.
So far, without Observable I use Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....)
You can still use the interval()
method, just ignore its result!
final Producer producer = ...;
int n = ...;
Observable<Object> obs =
Observable.interval(n,TimeUnit.SECONDS)
.map(new Func1<Integer,Object>() {
@Override
public Object call(Integer i) {
return producer.readData();
}
));
I am new to RxJava, but nobody else has answered your question so here is a stab at it.
My suggestion is for your Producer class to implement the interface Observable.OnSubscribeFunc
(Javadoc). In the method public Subscription onSubscribe(Observer<T> observer)
, you can call your Producer#readData()
method to get what data is currently available and pass that data into the onNext()
method, like this:
// Make sure to change T as appropriate
public class Producer implements OnSubscribeFunc<T> {
. . .
@Override
public Subscription onSubscribe(Observer<T> observer) {
while (this.hasData()) {
observer.onNext(this.readData());
observer.onCompleted();
}
}
(This assumes that your Producer#readData()
method returns data in chunks rather than all at once. Tweak as needed.)
You can then subscribe to your Producer object using Observable#create(OnSubscribeFunc<T>)
method, and also arrange to have your Observable return elements on a timer:
// In your observer class
Observable<T> myProducer = Observable.create(new Producer(...));
ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(....);
Observable<T> myProducerTimed = myProducer.subscribeOn(Schedulers.executor(scheduler));
// Now use myProducerTimed
I can't test those last two lines right now. Sorry, I'll look into that and update my answer if it changes things.
DISCLAIMER: I am an RxJava n00b, so this solution might suck or might be needlessly messy. Fix what seems wrong.
UPDATE: I had a problem (RxJava -- Terminating Infinite Streams) with RxJava as well; looks like my solution will look a lot like yours (using the Scheduler to make the Observable return elements periodically).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With