Rx: Combine ThrottleFirst and Sample operators



Given a source observable S, how can I ask RxJava / Rx to produce observable D, that:

  1. Emits first item from S without any delay
  2. Waits at least T seconds after emitting every item and before emitting next item L, where L is the last item emitted by S during the waiting period
  3. Emits the next item immediately after it appers in S, if S didn't produce any item during the waiting period T (from the point #2)

Marble diagram:

enter image description here

I thought to use:

  • Sample operator, but it does not satisfy the requirement #3.
  • Debounce operator, but it also does not satisfy the requirement #3.
  • ThrottleFirst operator but it does not satisfy the requirement #2, because it does not remember L (while Sample does that).

I would prefer the most simple answer, that utilises standard operators (if it is possible).

1 Answers

If one is limited to standard operators only, this could be achieved by using publish and switching between two collection modes: direct, and buffer with time. In the latter mode, if the buffer turns out to be empty, switch back to the direct mode:

import java.util.concurrent.TimeUnit;

import org.junit.Test;

import io.reactivex.*;
import io.reactivex.schedulers.TestScheduler;

public class ThrottleSampleTest {

    public void test() {
        TestScheduler tsch = new TestScheduler();

                100,                // should emit 100 at T=100
                110, 120, 130, 150, // should emit 150 at T=200 
                250, 260,           // should emit 260 at T=300
                400                 // should emit 400 at T=400
        .flatMap(v -> Flowable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
        .compose(throttleFirstSample(100, TimeUnit.MILLISECONDS, tsch))
        .subscribe(v -> 
            System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS))

        tsch.advanceTimeBy(1, TimeUnit.SECONDS);

    static final Exception RESTART_INDICATOR = new Exception();

    static <T> FlowableTransformer<T, T> throttleFirstSample(
            long time, TimeUnit unit, Scheduler scheduler) {
        return f ->
            .publish(g ->
                    .buffer(time, unit, scheduler)
                    .map(v -> {
                        if (v.isEmpty()) {
                            throw RESTART_INDICATOR;
                        return v.get(v.size() - 1);
                .retry(e -> e == RESTART_INDICATOR)

Edit: The alternative is to have a custom operator:

public void testObservable() {
    TestScheduler tsch = new TestScheduler();

            100,                // should emit 100 at T=100
            110, 120, 130, 150, // should emit 150 at T=200 
            250, 260,           // should emit 260 at T=300
            400                 // should emit 400 at T=400
    .flatMap(v -> Observable.timer(v, TimeUnit.MILLISECONDS, tsch).map(w -> v))
    .compose(throttleFirstSampleObservable(100, TimeUnit.MILLISECONDS, tsch))
    .subscribe(v -> System.out.println(v + " at T=" + tsch.now(TimeUnit.MILLISECONDS)));

    tsch.advanceTimeBy(1, TimeUnit.SECONDS);

static <T> ObservableTransformer<T, T> throttleFirstSampleObservable(
        long time, TimeUnit unit, Scheduler scheduler) {
    return f -> new Observable<T>() {
        protected void subscribeActual(Observer<? super T> observer) {
            f.subscribe(new ThrottleFirstSampleObserver<T>(
                observer, time, unit, scheduler.createWorker()));

static final class ThrottleFirstSampleObserver<T> 
extends AtomicInteger
implements Observer<T>, Disposable, Runnable {

    private static final long serialVersionUID = 205628968660185683L;

    static final Object TIMEOUT = new Object();

    final Observer<? super T> actual;

    final Queue<Object> queue;

    final Worker worker;

    final long time;

    final TimeUnit unit;

    Disposable upstream;

    boolean latestMode;

    T latest;

    volatile boolean done;
    Throwable error;

    volatile boolean disposed;

    ThrottleFirstSampleObserver(Observer<? super T> actual, 
            long time, TimeUnit unit, Worker worker) {
        this.actual = actual;
        this.time = time;
        this.unit = unit;
        this.worker = worker;
        this.queue = new ConcurrentLinkedQueue<Object>();

    public void onSubscribe(Disposable d) {
        upstream = d;

    public void onNext(T t) {

    public void onError(Throwable e) {
        error = e;
        done = true;

    public void onComplete() {
        done = true;

    public boolean isDisposed() {
        return upstream.isDisposed();

    public void dispose() {
        disposed = true;
        if (getAndIncrement() == 0) {
            latest = null;

    public void run() {

    void drain() {
        if (getAndIncrement() != 0) {

        int missed = 1;
        Observer<? super T> a = actual;
        Queue<Object> q = queue;

        for (;;) {

            for (;;) {
                if (disposed) {
                    latest = null;

                boolean d = done;
                Object v = q.poll();
                boolean empty = v == null;

                if (d && empty) {
                    if (latestMode) {
                        T u = latest;
                        latest = null;
                        if (u != null) {
                    Throwable ex = error;
                    if (ex != null) {
                    } else {

                if (empty) {

                if (latestMode) {
                    if (v == TIMEOUT) {
                        T u = latest;
                        latest = null;
                        if (u != null) {
                            worker.schedule(this, time, unit);
                        } else {
                            latestMode = false;
                    } else {
                        latest = (T)v;
                } else {
                    latestMode = true;
                    worker.schedule(this, time, unit);

            missed = addAndGet(-missed);
            if (missed == 0) {
