Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using RxJava and EventBus to send an event back to the activity/fragment

Android Studio 3.2 Canary 8
com.squareup:otto:1.3.8
io.reactivex:rxjava:1.3.7
kotlin 1.2.31

I am trying to send an event back to my Activity using the otto EventBus.

However, I am using RxJava to perform some background work and need the event to be sent after the first one completes. However, after post the event. The activity never receives it.

This event must do this on the main thread. The RxJava is on the IO thread. I am not sure what is the best way to do this:

Here is my code for the interactor that does the RxJava and EventBus post

class Interactors(private val eventBus: Bus) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .andThen(requestTransmission())
    }

    private fun insertTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(4000)
            System.out.println("insertTransmission doing some long operation")
        }
    }

    private fun requestTransmission(): Completable {
        return Completable.fromCallable {
            Thread.sleep(2000)
            System.out.println("requestTransmission doing some long operation")
        }
    }

    /* Need to send this event back to the activity/fragment */
    private fun onTransmissionChanged(): Completable {
        return Completable.fromCallable {
            System.out.println("onTransmissionChanged send event to activity")
            eventBus.post(TransmissionChanged())
        }
    }
}

Activity:

public class HomeActivity extends AppCompatActivity {
    private Bus eventBus = new Bus();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_home);

        eventBus.register(this);

        new Interactors(eventBus).transmitMessage()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe();
    }

    @Override
    protected void onDestroy() {
        eventBus.unregister(this);
        super.onDestroy();
    }

    @Subscribe
    public void onTransmissionChangedEvent(TransmissionChanged transmissionChanged) {
        System.out.println("onTransmissionChangedEvent");
    }
}

And the EventBus class:

class TransmissionChanged

This the output when I run the app:

insertTransmission doing some long operation
onTransmissionChanged

I am not sure if the eventBus.post(..) is blocking. Actually this should be done in the main thread as is posting back to the Activity to perform some update in the UI.

like image 270
ant2009 Avatar asked Apr 06 '18 17:04

ant2009


People also ask

What is EventBus used for?

EventBus is an open-source library for Android and Java using the publisher/subscriber pattern for loose coupling. EventBus enables central communication to decoupled classes with just a few lines of code – simplifying the code, removing dependencies, and speeding up app development.

How do I use EventBus on Android?

To tell the EventBus to trigger this method we need to add the @Subscribe annotation to the method. We should unregister and re-register the EventBus in the onStart and onDestroy method on the activity. Now we will call this method on the add item click of second activity.

What is EventBus Java?

EventBus is a publish/subscribe event bus for Android and Java. EventBus... simplifies the communication between components. decouples event senders and receivers. performs well with Activities, Fragments, and background threads.


2 Answers

Do you really need to mix an EventBus and RxJava? For me this introduces extra complexity without a lot of benefit to it. Your use-case seems like a perfect example to use an Rx stream, doing some work on each emission (in your case updating the UI via onTransmissionChangedEvent()).

I'd change transmitMessage() method to something like this:

fun transmitMessage(): Observable<TransmissionChanged> {
    return Observable.create(ObservableOnSubscribe<TransmissionChanged> { emitter ->
        insertTransmission()

        emitter.onNext(TransmissionChanged())  // perform this after the long running operation from insertTransmission() has finished

        requestTransmission()

        emitter.onComplete()   // after the long running operation from requestTransmission() has finished
    })
}

I guess you need some extra data to update your UI accordingly - this is encapsulated in TransmissionChanged class - include whatever you need there. One thing to be aware of - using Observable.create() is dangerous in RxJava 1. I don't remember what was the safe way of doing so and don't have a project with RxJava 1 to experiment with ... but there was a factory method in the Observable class that could do the job safely.

Using the above, your Activity code becomes cleaner as well. There's no need for Otto anymore, as all your operations are handled via the single Rx stream.

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_home);

    new Interactors()
        .transmitMessage()
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(transmission -> onTransmissionChangedEvent(transmission),
                throwable -> handleError(throwable),
                () -> handleCompletion()
        );
}
like image 61
Vesko Avatar answered Nov 03 '22 00:11

Vesko


Not allowing the receiver to specify which thread it would like to receive events on is a short coming of Otto. It enforces that all calls need to be on the same thread (defaults to the main thread). It is up to the caller to get be on the correct thread. I much prefer EventBus by GreenRobot. You change which thread you want to receive on with an annotation. So, my first suggestion would be, if you are not too invested in Otto yet, is to consider using EventBus instead.

If you are not in a position to rework all your event bus code, you can post back to the main looper by allocating a Handler. It is quick and easy, but feels a little like stepping out of rx framework.

private fun onTransmissionChanged(): Completable {
    return Completable.fromCallable {
        System.out.println("onTransmissionChanged send event to activity")
        Handler(Looper.getMainLooper()).post {
            eventBus.post(TransmissionChanged())
        }
    }
}

If you are calling this a lot, you may want to cache the Handler and pass it into your Interactors constructor.

If you want to stick with RxJava schedulers, you can pass a Scheduler into your constructor to indicate where you want to do your background work instead of using subscribeOn. In transmitMessage, use it schedule the background ops while forcing the eventBus.post to the main thread as follows --

class Interactors(private val eventBus: Bus, private val scheduler: Scheduler) {
    fun transmitMessage(): Completable {
        return insertTransmission()
                .subscribeOn(scheduler)
                .observeOn(AndroidSchedulers.mainThread())
                .andThen(onTransmissionChanged()) /* Send event to the activity */
                .observeOn(scheduler)
                .andThen(requestTransmission())
    }
    // Rest of the class is unchanged

}

in this case, you will use it in HomeActivity as follows --

new Interactors(eventBus, Schedulers.io()).transmitMessage()
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe();
like image 40
iagreen Avatar answered Nov 02 '22 22:11

iagreen