In my app, I have a service that track the user location then send it to the server using RxJava. If the request succeed, I'm receiving back the inserted id, then I could delete them from my local database.
My issue is, I'm calling that observable quickly before the previous task has ended so the server receive duplicates points (same point for both request). I would need to perform the Observable on a Single Thread to avoid having another task querying the database before the previous one has ended. I create a Looper thread but still, i'm sending duplicates and I dont know why. The server request seems now to wait until it ends before executing the next one, but still, on the next request, it's sending the same points! Gahh
final StoreChangeEvent finalEvent = event;
Observable
.defer(() -> Observable.just(database.getAllPoints()))
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper)))
.subscribe();
It seems like database.getAllPoints() is called too soon... Should I add a .blocking()?
Let's say I have 5 point to post to the server (A,B,C,D)
Result : A-B-C-D are there twice on the server database because two request was sent with the same point
You can try using Subject - a thing that acts both as a observer and an observable.
Subscribe to it once and notify it on new events (onNext()) from other places. The subscriber will handle the events subsequently.
I used SerializedSubject in case you'll call notifyNewEvent() from different threads, otherwise you're okay using BehaviourSubject.
SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())
public void initialize() {
// Since you access your incoming event from doOnCompleted,
// need this extra flatMap function so that you can access your event
// outside rx java chain.
subject.flatMap(new Func1() {
@Override
public Observable call(StoreChangeEvent event) {
return Observable
.just(database.getAllPoints())
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)));
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
public void notifyNewEvent(StoreChangeEvent event) {
subject.onNext(event);
}
If you don't access UI, why even bother with subjects and observeOn, subscribeOn. Create an executor with one thread (all tasks are executed subsequently), and submit the tasks to it, leveraging RxJava useful capabilities there.
ExecutorService executorService = Executors.newSingleThreadExecutor();
public void notifyNewEvent(final StoreChangeEvent event) {
executorService.execute(new Runnable() {
public void run() {
Observable.just(database.getAllPoints())
// Blocking, so that the thread doesn't exit
// and blocks on subscribe() till completion.
.toBlocking()
.flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
.map(result -> deletePoint(result))
.doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
.doOnCompleted(() -> emitStoreChange(finalEvent)))
.subscribe();
}
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With