Logo Questions Linux Laravel Mysql Ubuntu Git Menu

RxJava - observe data that might always change




I'm trying to jump on the reactive bandwagon, but after reading and going through lots of examples, I still haven't found what I'm looking for.

I have a model object which might get change during any time in the lifecycle of the application.
The changes might come from a specific request to update it (from servers, db, etc) or it might get updated due an event which fires in the app.

My question is how do I create an Observable of such an object and how do I keep updating the subscribers when there's a change?

From what I've seen so far I can create the observable like so:

Observable.create(new Observable.OnSubscribe<MyModel>() {
    public void call(final Subscriber<? super MyModel> subscriber) {
        subscriber.onNext(MyModel instance);

What I'm missing here:

  1. I do not want to emit different values (instances of MyModel) but just want to let the subscribers know that the same instance (the one on which they subscribed on) has changed.

  2. If I understand it right, then the call method is invoked whenever a new subscriber has registered, but that's not what I need, I need to take action only when there's an update and then I want to notify ALL subscribers.

There's a good chance that I just got this all wrong, which is why I'd be happy to understand how it's possible to accomplish my needs with RxJava.


like image 439
Nitzan Tomer Avatar asked Jul 21 '16 12:07

Nitzan Tomer

People also ask

What is Observable and observer in RxJava?

An Observable is like a speaker that emits the value. It does some work and emits some values. An Operator is like a translator which translates/modifies data from one form to another form. An Observer gets those values.

What is the difference between RxJava and RxAndroid?

RxAndroid is an extension of RxJava for Android which is used only in Android application. RxAndroid introduced the Main Thread required for Android. To work with the multithreading in Android, we will need the Looper and Handler for Main Thread execution. RxAndroid provides AndroidSchedulers.

Is RxJava asynchronous?

RxJava is a specific implementation of reactive programming for Java and Android that is influenced by functional programming. It favors function composition, avoidance of global state and side effects, and thinking in streams to compose asynchronous and event-based programs.

What is single observer in RxJava?

Single is an Observable that always emit only one value or throws an error. A typical use case of Single observable would be when we make a network call in Android and receive a response.

1 Answers

I think the best option to do stuff like that is Subjects. So you can have something like this:

public static class MyModel {
    private PublishSubject<MyModel> changeObservable = PublishSubject.create();
    private String field1;

    public String getField1() {
        return field1;

    public void setField1(String field1) {
        this.field1 = field1;

    public Observable<MyModel> getModelChanges() {
        return changeObservable;

    public String toString() {
        return "MyModel{" +
                "field1='" + field1 + '\'' +

public static void main(String[] args) {
    MyModel myModel = new MyModel();


So all setters in the model will notify that something has changed in your model and all subscribers will get that data.

like image 65
Divers Avatar answered Oct 29 '22 03:10
