Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I create an Observer over a dynamic list in RxJava?

Tags:

I need to create an Observer over an array which is constantly changing (adding elements).

I am using Obserable.from(Iterable) but it seems that it creates the Observable over the ArrayList as it is at the moment of the creation.

I need the Observer to be notified and the Action to be executed everytime the ArrayList gets a new element added.

like image 258
juanpavergara Avatar asked Mar 02 '15 18:03

juanpavergara


People also ask

How do you make an Observable in RxJava?

Following are the convenient methods to create observables in Observable class. just(T item) − Returns an Observable that signals the given (constant reference) item and then completes. fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.

Can we create our own Observable in RxJava?

just() This is one of the easiest and convenient ways to create observable. just() constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription. The just operator converts an item into an Observable that emits that item.

Is RxJava observer pattern?

RxJava is an open source library for Java and Android that helps you create reactive code. It's heavily inspired by functional programming. RxJava implements the Observer pattern with two main interfaces: Observable and Observer .


3 Answers

There you go. Thanks to Dávid Karnok on RxJava Google Group

import java.util.ArrayList;
import java.util.List;

import rx.Observable;
import rx.subjects.PublishSubject;

public class ObservableListExample {

    public static class ObservableList<T> {

        protected final List<T> list;
        protected final PublishSubject<T> onAdd;

        public ObservableList() {
            this.list = new ArrayList<T>();
            this.onAdd = PublishSubject.create();
        }
        public void add(T value) {
            list.add(value);
            onAdd.onNext(value);
        }
        public Observable<T> getObservable() {
            return onAdd;
        }
    }

    public static void main(String[] args) throws Exception {
        ObservableList<Integer> olist = new ObservableList<>();

        olist.getObservable().subscribe(System.out::println);

        olist.add(1);
        Thread.sleep(1000);
        olist.add(2);
        Thread.sleep(1000);
        olist.add(3);
    }
}
like image 90
juanpavergara Avatar answered Sep 28 '22 03:09

juanpavergara


I would consider this approach based on BehaviourSubject. This differs from juanpavergara's solution in that an onNext() will be immediately emitted to the Observer when subscribing to the Observable.

public class ObservableList<T> {

    protected final List<T> list;
    protected final BehaviorSubject<List<T>> behaviorSubject;

    public ObservableList(List<T> list) {
        this.list = list;
        this.behaviorSubject = BehaviorSubject.create(list);
    }

    public Observable<List<T>> getObservable() {
        return behaviorSubject;
    }

    public void add(T element) {
        list.add(element);
        behaviorSubject.onNext(list);
    }
}


private void main() {
    final List<Integer> list = new ArrayList<>();
    list.add(0);
    list.add(1);

    final ObservableList<Integer> olist = new ObservableList<>(list);

    olist.getObservable().subscribe(System.out::println);

    olist.add(2);
    olist.add(3);
}

This solution may be useful when implementing MVP, when you want to observe one resource (ie: a list of objects) returned by one component in the system (ie: one repository or DataSource), and you want the Observer (ie: Presenter or Interactor) to be notified when an element is added to the list in another part of the system.

like image 35
GaRRaPeTa Avatar answered Sep 28 '22 03:09

GaRRaPeTa


You can merge two observables to one. One of them can be initial list of elements and second can be subject:

import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;

public class ExampleObservableList {
    public static void main(String[] args) {
        List<Integer> initialNumbers = new ArrayList<Integer>();
        initialNumbers.add(1);
        initialNumbers.add(2);

        Observable<Integer> observableInitial = Observable.from(initialNumbers);
        ReplaySubject<Integer> subject = ReplaySubject.create();

        Observable<Integer> source = Observable.merge(observableInitial, subject);

        source.subscribe(System.out::println);

        for (int i = 0; i < 100; ++i) {
            subject.onNext(i);
        }
    }

}

If you don't have initial elements you can use only ReplaySubject (or other Subject -> see http://reactivex.io/documentation/subject.html):

public static void main(String[] args) {
    ReplaySubject<Integer> source = ReplaySubject.create();
    source.subscribe(System.out::println);

    for (int i = 0; i < 100; ++i) {
        source.onNext(i);
    }
}
like image 23
krynio Avatar answered Sep 28 '22 05:09

krynio