Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Queue like Subject in RxJava

Tags:

I'm looking for a Subject (or something similar) that can:

  1. Could receive items and hold them in a queue or buffer if there are no subscribers
  2. Once we have a subscriber all items are consumed and never emitted again
  3. I can subscribe/unsubscribe to/from Subject

BehaviorSubject almost would do the job, but it retains last observed item.

UPDATE

Based on accepted answer I worked out similar solution for single observed item. Also added unsubscription part to avoid memory leaks.

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {

    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }

    companion object {
        fun create(): LastEventObservable {
            val state = State()

            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()

                val subscription = state.subscriber.subscribe(subscriber)

                subscriber.add(Subscriptions.create { subscription.unsubscribe() })
            }

            return LastEventObservable(onSubscribe, state)
        }
    }

    private class State {
        var lastItem: Any? = null
        val subscriber = PublishSubject.create<Any>()
    }
}
like image 495
Martynas Jurkus Avatar asked Jun 09 '16 14:06

Martynas Jurkus


2 Answers

I achieve the expected result creating a customized Observable that wraps a publish subject and handles emission cache if there's no subscribers attached. Check it out.

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        MyCustomObservable myCustomObservable = new MyCustomObservable();

        myCustomObservable.emit("1");
        myCustomObservable.emit("2");
        myCustomObservable.emit("3");

        Subscription subscription = myCustomObservable.subscribe(System.out::println);

        myCustomObservable.emit("4");
        myCustomObservable.emit("5");

        subscription.unsubscribe();

        myCustomObservable.emit("6");
        myCustomObservable.emit("7");
        myCustomObservable.emit("8");

        myCustomObservable.subscribe(System.out::println);
    }
}

class MyCustomObservable extends Observable<String> {
    private static PublishSubject<String> publishSubject = PublishSubject.create();
    private static List<String> valuesCache = new ArrayList<>();

    protected MyCustomObservable() {
        super(subscriber -> {
            Observable.from(valuesCache)
                    .doOnNext(subscriber::onNext)
                    .doOnCompleted(valuesCache::clear)
                    .subscribe();

            publishSubject.subscribe(subscriber);
        });
    }

    public void emit(String value) {
        if (publishSubject.hasObservers()) {
            publishSubject.onNext(value);
        } else {
            valuesCache.add(value);
        }
    }
}

Hope that it helps!

Best Regards.

like image 154
Rodrigo Henriques Avatar answered Sep 19 '22 13:09

Rodrigo Henriques


If you only want to wait for a single subscriber, use UnicastSubject but note that if you unsubscribe in the middle, all subsequent queued items will be lost.

Edit:

Once we have a subscriber all items are consumed and never emitted again

For multiple subscribers, use ReplaySubject.

like image 24
akarnokd Avatar answered Sep 22 '22 13:09

akarnokd