I'm new to RxJava, and I need to use the Observable feature in an asynchronous way.
I also need to use timeouts : in my exemple, I want every process to end in 1 second or less.
Here is what I've done for now :
public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.just(name).timeout(1000, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io());
obs2.subscribe(new Action1<String>() {
@Override
public void call(String s) {
if("CCCCC".equals(s)){
try {
Thread.sleep(3200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(s + " " + new Date() +" "+Thread.currentThread().getName());
}
});
}
public static void main(final String[] args) throws InterruptedException, IOException {
hello("AAAAA");
hello("CCCCC");
hello("BBBBBB");
System.in.read();
}
Result :
AAAAA Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 09:43:46 CEST 2017 RxIoScheduler-4
CCCCC Thu Oct 05 09:43:49 CEST 2017 RxIoScheduler-3
I was actually expecting to get a TimeoutException from the thread named "RxIoScheduler-3" since it has been sleeping for 3 seconds.
What's wrong with my code and my approach of timeouts in RxJava?
Thank you for helping me.
In reactive programming, Rx offers the possibility to add timeout on what you subscribe by. The basic use case consists on making a stream throwing a TimeoutException when it doesn't emit a result in the due time.
The exception that is thrown when the time allotted for a process or operation has expired.
RxJava Error Handling That means that after error happened stream is basically finished and no more events can come through it. If Consumer didn't handle error in Observer callback, then that error is sent to a global error handler (which in case of Android crashes the app by default).
According to the docs the timeout
operator will:
mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted items
So, a timeout is deemed to have occurred if there is a delay in emitting events but you have put a delay in consuming events and that will not cause a timeout.
If you rework your code to pause during emission then a timeout will occur. For example:
public static void hello(String name) throws IOException {
Observable<String> obs2 = Observable.fromCallable(() -> {
if ("CCCCC".equals(name)) {
// pause for 150ms before emitting "CCCCC"
try {
Thread.sleep(150);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return name;
}
).timeout(100, MILLISECONDS) // timeout if there is a pause in emission of more than 100ms
.subscribeOn(Schedulers.io());
obs2.subscribe(s -> System.out.println(s + " " + new Date() + " " + Thread.currentThread().getName()),
throwable -> System.err.println(throwable.getClass().getSimpleName() + " " + new Date() + " " + Thread.currentThread().getName()));
}
Using the above form of hello()
you'll get the following output written to console:
AAAAA Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-2
BBBBBB Thu Oct 05 10:10:33 IST 2017 RxIoScheduler-4
TimeoutException Thu Oct 05 10:10:33 IST 2017 RxComputationScheduler-1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With