Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Making a RxJava async task thread safe

In my app, I have a service that track the user location then send it to the server using RxJava. If the request succeed, I'm receiving back the inserted id, then I could delete them from my local database.

  1. Query the database for the points to send
  2. If not empty, I post all the gathered point from the database
  3. If the request succeed, I delete from the database the posted points

My issue is, I'm calling that observable quickly before the previous task has ended so the server receive duplicates points (same point for both request). I would need to perform the Observable on a Single Thread to avoid having another task querying the database before the previous one has ended. I create a Looper thread but still, i'm sending duplicates and I dont know why. The server request seems now to wait until it ends before executing the next one, but still, on the next request, it's sending the same points! Gahh

    final StoreChangeEvent finalEvent = event;
    Observable
            .defer(() -> Observable.just(database.getAllPoints()))
            .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(AndroidSchedulers.from(backgroundLooper)))
            .subscribe();

It seems like database.getAllPoints() is called too soon... Should I add a .blocking()?

Let's say I have 5 point to post to the server (A,B,C,D)

  1. I query the DB and send A-B-C-D to the server
  2. I reveive another point from the device (point E)
  3. I query the DB and send (A-B-C-D-E)
  4. I receive success from the server, then I delete A-B-C-D from the local DB
  5. I receive success from the second request, then I delete A-B-C-D-E from the local DB

Result : A-B-C-D are there twice on the server database because two request was sent with the same point

like image 965
Jaythaking Avatar asked Jan 01 '26 15:01

Jaythaking


1 Answers

Option 1 - Subject

You can try using Subject - a thing that acts both as a observer and an observable.

Subscribe to it once and notify it on new events (onNext()) from other places. The subscriber will handle the events subsequently.

I used SerializedSubject in case you'll call notifyNewEvent() from different threads, otherwise you're okay using BehaviourSubject.

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())

public void initialize() {
    // Since you access your incoming event from doOnCompleted,
    // need this extra flatMap function so that you can access your event
    // outside rx java chain.
    subject.flatMap(new Func1() {
        @Override
        public Observable call(StoreChangeEvent event) {
            return Observable
                    .just(database.getAllPoints())
                    .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                            .map(result -> deletePoint(result))
                            .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                            .doOnCompleted(() -> emitStoreChange(finalEvent)));
        }
    })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.from(backgroundLooper))
            .subscribe();
}

public void notifyNewEvent(StoreChangeEvent event) {
    subject.onNext(event);
}

Option 2 - Executors

If you don't access UI, why even bother with subjects and observeOn, subscribeOn. Create an executor with one thread (all tasks are executed subsequently), and submit the tasks to it, leveraging RxJava useful capabilities there.

ExecutorService executorService = Executors.newSingleThreadExecutor();

public void notifyNewEvent(final StoreChangeEvent event) {
    executorService.execute(new Runnable() {
        public void run() {
            Observable.just(database.getAllPoints())
                // Blocking, so that the thread doesn't exit
                // and blocks on subscribe() till completion.
                .toBlocking() 
                .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent)))
                .subscribe();
        }
    });
}
like image 186
Artem Novikov Avatar answered Jan 03 '26 05:01

Artem Novikov



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!