Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJava only check the first response item with timeout

I see that ReactiveX (RxJava) has an operator timeout, which will apply to every item in a subscription stream. But I only want to check the very first response with a timeout and do not care about timeouts for the following responses. How can I implement this requirement elegantly with RxJava's operators?

like image 377
bitdancer Avatar asked Sep 03 '16 07:09

bitdancer


2 Answers

Kotlin extension for @ndori's answer

fun <T> Observable<T>.timeoutFirstMessage(timeout: Long, unit: TimeUnit): Observable<T> {
    return this.timeout<Long, Long>(
        Observable.timer(timeout, unit),
        Function { Observable.never<Long>() }
    )
}
like image 146
Marcio Granzotto Avatar answered Sep 17 '22 12:09

Marcio Granzotto


Best option is to use a timeout overload which returns a timeout observable for every item, and has one for the subscription as well (which is the one you are interested in).

observable.timeout(
  () -> Observable.empty().delay(10, TimeUnit.SECONDS),
  o -> Observable.never()
)

I'll explain, the first func0 will run on subscribe, and will emit an empty observable (which emits complete) delayed by the time you want. if the time passes before any item arrived there will be a timeout like you wanted. the second parameter func1 will decide timeouts between items, which you have no use for so we just passes never (which does not complete or do anything)

Another option is following Luciano suggestion, you can do it like this:

    public static class TimeoutFirst<T> implements Transformer<T,T> {

    private final long timeout;
    private final TimeUnit unit;

    private TimeoutFirst(long timeout, TimeUnit unit) {
        this.timeout = timeout;
        this.unit = unit;
    }

    @Override
    public Observable<T> call(Observable<T> observable) {
        return Observable.amb(observable,
                Observable.timer(timeout, unit).flatMap(aLong -> Observable.error(new TimeoutException("Timeout after " + timeout + " "  + unit.name()))));
    }
}

public static <T> Transformer<T, T> timeoutFirst(long timeout, TimeUnit seconds) {
    return new TimeoutFirst<>(timeout, seconds);
}

which is a pretty neat solution using amb.

like image 37
ndori Avatar answered Sep 19 '22 12:09

ndori