I need to create an Observer over an array which is constantly changing (adding elements).
I am using Obserable.from(Iterable) but it seems that it creates the Observable over the ArrayList as it is at the moment of the creation.
I need the Observer to be notified and the Action to be executed everytime the ArrayList gets a new element added.
Following are the convenient methods to create observables in Observable class. just(T item) − Returns an Observable that signals the given (constant reference) item and then completes. fromIterable(Iterable source) − Converts an Iterable sequence into an ObservableSource that emits the items in the sequence.
just() This is one of the easiest and convenient ways to create observable. just() constructs a reactive type by taking a pre-existing object and emitting that specific object to the downstream consumer upon subscription. The just operator converts an item into an Observable that emits that item.
RxJava is an open source library for Java and Android that helps you create reactive code. It's heavily inspired by functional programming. RxJava implements the Observer pattern with two main interfaces: Observable and Observer .
There you go. Thanks to Dávid Karnok on RxJava Google Group
import java.util.ArrayList;
import java.util.List;
import rx.Observable;
import rx.subjects.PublishSubject;
public class ObservableListExample {
public static class ObservableList<T> {
protected final List<T> list;
protected final PublishSubject<T> onAdd;
public ObservableList() {
this.list = new ArrayList<T>();
this.onAdd = PublishSubject.create();
}
public void add(T value) {
list.add(value);
onAdd.onNext(value);
}
public Observable<T> getObservable() {
return onAdd;
}
}
public static void main(String[] args) throws Exception {
ObservableList<Integer> olist = new ObservableList<>();
olist.getObservable().subscribe(System.out::println);
olist.add(1);
Thread.sleep(1000);
olist.add(2);
Thread.sleep(1000);
olist.add(3);
}
}
I would consider this approach based on BehaviourSubject. This differs from juanpavergara's solution in that an onNext() will be immediately emitted to the Observer when subscribing to the Observable.
public class ObservableList<T> {
protected final List<T> list;
protected final BehaviorSubject<List<T>> behaviorSubject;
public ObservableList(List<T> list) {
this.list = list;
this.behaviorSubject = BehaviorSubject.create(list);
}
public Observable<List<T>> getObservable() {
return behaviorSubject;
}
public void add(T element) {
list.add(element);
behaviorSubject.onNext(list);
}
}
private void main() {
final List<Integer> list = new ArrayList<>();
list.add(0);
list.add(1);
final ObservableList<Integer> olist = new ObservableList<>(list);
olist.getObservable().subscribe(System.out::println);
olist.add(2);
olist.add(3);
}
This solution may be useful when implementing MVP, when you want to observe one resource (ie: a list of objects) returned by one component in the system (ie: one repository or DataSource), and you want the Observer (ie: Presenter or Interactor) to be notified when an element is added to the list in another part of the system.
You can merge two observables to one. One of them can be initial list of elements and second can be subject:
import rx.Observable;
import rx.subjects.ReplaySubject;
import java.util.ArrayList;
import java.util.List;
public class ExampleObservableList {
public static void main(String[] args) {
List<Integer> initialNumbers = new ArrayList<Integer>();
initialNumbers.add(1);
initialNumbers.add(2);
Observable<Integer> observableInitial = Observable.from(initialNumbers);
ReplaySubject<Integer> subject = ReplaySubject.create();
Observable<Integer> source = Observable.merge(observableInitial, subject);
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
subject.onNext(i);
}
}
}
If you don't have initial elements you can use only ReplaySubject
(or other Subject
-> see http://reactivex.io/documentation/subject.html):
public static void main(String[] args) {
ReplaySubject<Integer> source = ReplaySubject.create();
source.subscribe(System.out::println);
for (int i = 0; i < 100; ++i) {
source.onNext(i);
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With