Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Real World Use case of RxJava Subject

There are a lot of great articles explaining about RxJava. But almost none of these explain the concept with real world example.

So I basically understand the concept of RxJava Subject like a pipe and it is both observable and observer.

But I don't see what will be the real-world usage of this RxJava Subject in Android Development world. Could you elaborate something on that?

like image 905
Steve.NayLinAung Avatar asked Dec 14 '16 09:12

Steve.NayLinAung


People also ask

What is RxJava used for?

RxJava is a Java library that enables Functional Reactive Programming in Android development. It raises the level of abstraction around threading in order to simplify the implementation of complex concurrent behavior.

Is Rx Java still used?

RxJava, once the hottest framework in Android development, is dying. It's dying quietly, without drawing much attention to itself. RxJava's former fans and advocates moved on to new shiny things, so there is no one left to say a proper eulogy over this, once very popular, framework.

What is RxJava and RxAndroid?

RxAndroid is a RxJava for Android extension that is only used in Android applications. RxAndroid added the Android-required Main Thread. We will need the Looper and Handler for Main Thread execution in order to work with multithreading in Android. Note: AndroidSchedulers are provided by RxAndroid.

When should I dispose of disposable RxJava?

It goes to the backstack, therefore you don't need to create a new instance of CompositeDisposable every time when a fragment's view is re-created. If you have CompositeDisposable in activities, then call dispose() upon onDestroy() .


3 Answers

In my case was because I had one Observable that was waiting for an item of another Observable emitted which was asyncronious since it was Interval.

Scheduler scheduler = RxHelper.scheduler(vertx.getOrCreateContext());


Observable.just(callAnotherObservable)
          .subscribe(item -> System.out.println(item)

public Observable<String> callAnotherObservable(Scheduler scheduler, ){
          Subject subject = ReplaySubject.create(1);
          Observable.interval(100,TimeUnit.MILLISECONDS)
          .map(i->"item to be passed to the other observable")
          .subscribe(subject);
          return subject.observeOn(scheduler).first();//Here we wait for the first emission of the interval Observable.

}

Here as you can see we use subject.first() to wait for the first emission of the interval observable, which is running in another thread.

If you want to see more examples of "hotObservables" https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java

like image 130
paul Avatar answered Oct 03 '22 19:10

paul


Subjects have a lots of "real world" applications particularly when you gradually transform your codebase from imperative to reactive style. It can serve as a bridge between these two worlds where you are able to affect a stream with non-reactive code coming outside of a stream.

But as you asked for an example. Recently I was implementing custom behavior when user tries to go back from an activity. RxJava provided me very elegant solution to the problem I was facing so I needed to compose a stream of events that corresponded to user wanting to go back. I deliberately avoided the phrase "press the back button", because there are several places in the codebase where I can simulate the going back situation and it always goes through onBackPressed() method.

Turning this into a single stream would require a massive refactoring, which is not on a budget right now. But I didn't want to give up the solution through RxJava as it could made the code a lot more concise. Using BehaviorSubject gave the answer as I just needed to emit an event within the onBackPressed() method.

like image 26
koperko Avatar answered Oct 03 '22 18:10

koperko


I was building a generic ReportDownloadManager for an Android application, where we needed to consume an Observable whose Observer would download and store a file locally. The event of successful or unsuccessful download needed to be handled by the manager, but also needed to expose an Observable to the Activities/Services that use this DownloadManager. I believe this was a good use-case for using a Subject to both consume the initial Observable but also generate events for the client Observable.

import android.app.DownloadManager;
import android.content.Context;
import android.webkit.MimeTypeMap;

import java.io.File;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;

public class ReportDownloadManager {

    private final DownloadManager platformDownloadManager;

    public ReportDownloadManager(Context context) {
        this.platformDownloadManager = (DownloadManager) context.getSystemService(Context.DOWNLOAD_SERVICE);
    }

    public Observable<Object> download(final File file, DownloadAction downloadAction) {
        final PublishSubject<Object> subject = PublishSubject.create();
        downloadAction.execute(file.getName())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        platformDownloadManager.addCompletedDownload(file.getName(), "No description", false,
                                MimeTypeMap.getSingleton().getMimeTypeFromExtension("pdf"), file.getAbsolutePath(),
                                file.length(), true);
                        subject.onNext(new Object());
                        subject.onComplete();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        subject.onError(throwable);
                    }
                });
        return subject;
    }

    interface DownloadAction {
        Observable<Object> execute(String fileAbsolutePath);
    }

}
like image 40
Ioannis Sermetziadis Avatar answered Oct 03 '22 18:10

Ioannis Sermetziadis