I see that ReactiveX (RxJava) has an operator timeout
, which will apply to every item in a subscription stream. But I only want to check the very first response with a timeout and do not care about timeouts for the following responses. How can I implement this requirement elegantly with RxJava's operators?
Kotlin extension for @ndori's answer
fun <T> Observable<T>.timeoutFirstMessage(timeout: Long, unit: TimeUnit): Observable<T> {
return this.timeout<Long, Long>(
Observable.timer(timeout, unit),
Function { Observable.never<Long>() }
)
}
Best option is to use a timeout overload which returns a timeout observable for every item, and has one for the subscription as well (which is the one you are interested in).
observable.timeout(
() -> Observable.empty().delay(10, TimeUnit.SECONDS),
o -> Observable.never()
)
I'll explain, the first func0 will run on subscribe, and will emit an empty observable (which emits complete) delayed by the time you want. if the time passes before any item arrived there will be a timeout like you wanted. the second parameter func1 will decide timeouts between items, which you have no use for so we just passes never (which does not complete or do anything)
Another option is following Luciano suggestion, you can do it like this:
public static class TimeoutFirst<T> implements Transformer<T,T> {
private final long timeout;
private final TimeUnit unit;
private TimeoutFirst(long timeout, TimeUnit unit) {
this.timeout = timeout;
this.unit = unit;
}
@Override
public Observable<T> call(Observable<T> observable) {
return Observable.amb(observable,
Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " " + unit.name()))));
}
}
public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) {
return new TimeoutFirst<>(timeout, seconds);
}
which is a pretty neat solution using amb.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With