Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Transactions in Android Room w/ RxJava2

A requirement of my application is to allow the user to progress through multiple steps, then upon completion write values to the database based off of entries in each step. Each step in the UI may contribute to the operations that will need to be written to the database. The data may be in multiple tables and pertain to different rows in those tables. If any of the database operations fail, then the entire operation should fail.

I initially considered loading all of the data into memory, manipulating it, then simply calling the update methods in every possible entity (with a conflict strategy of REPLACE), but there could be an extremely large amount of data in memory.

I figured that I would instead be able to assemble a List, where each Fragment in the display contributes one or more Completables, then execute those sequentially using Completable.concat() at the end of the UI flow. It would look something like below:

    Completable one = Completable.fromAction(() -> Log.w(LOG_TAG, "(1)")).delay(1, TimeUnit.SECONDS);
    Completable two = Completable.fromAction(() -> Log.w(LOG_TAG, "(2)")).delay(2, TimeUnit.SECONDS);
    Completable three = Completable.fromAction(() -> Log.w(LOG_TAG, "(3)")).delay(3, TimeUnit.SECONDS);
    Completable four = Completable.fromAction(() -> Log.w(LOG_TAG, "(4)")).delay(3, TimeUnit.SECONDS);

    Completable.concatArray(one, two, three, four)
            .doOnSubscribe(__ -> {
                mRoomDatabase.beginTransaction();
            })
            .doOnComplete(() -> {
                mRoomDatabase.setTransactionSuccessful();
            })
            .doFinally(() -> {
                mRoomDatabase.endTransaction();
            })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(Schedulers.io())
            .subscribe();

The Completables would actually be wrappers around Room DAO insert/update/delete methods. I would likely also perform a UI operation upon completion which is why I'm observing on the main thread.

When I execute this code, I get these logs:

W/MyPresenter: Begin transaction.
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.
W/MyPresenter: End transaction.
W/System.err: java.lang.IllegalStateException: Cannot perform this operation because there is no current transaction.
W/System.err:     at android.database.sqlite.SQLiteSession.throwIfNoTransaction(SQLiteSession.java:915)
W/System.err:     at android.database.sqlite.SQLiteSession.endTransaction(SQLiteSession.java:398)
W/System.err:     at android.database.sqlite.SQLiteDatabase.endTransaction(SQLiteDatabase.java:524)
W/System.err:     at android.arch.persistence.db.framework.FrameworkSQLiteDatabase.endTransaction(FrameworkSQLiteDatabase.java:88)
W/System.err:     at android.arch.persistence.room.RoomDatabase.endTransaction(RoomDatabase.java:220)
W/System.err:     at ...lambda$doTest$22$MyPresenter(MyPresenter.java:490)

Why is the transaction gone by the time I reach doFinally? I also welcome any comments on the quality or feasibility of this approach as I'm quite new to RxJava and Room.

like image 809
Steve Avatar asked Mar 01 '18 21:03

Steve


Video Answer


2 Answers

By logging the current thread and perusing Android developer documentation I think I might finally understand what I'm doing wrong.

1) Transactions must occur on the same thread. That's why it's telling me there's no transaction; I'm apparently bouncing between threads.

2) The doOnSubscribe, doOnComplete, and doFinally methods are side effects and therefore not part of the actual stream itself. That means they will not occur on the scheduler I subscribe on. They will occur on the Scheduler I observe on.

3) Because I want to receive results on the UI thread upon completion, but want the side effects to occur on a background thread, I need to change the position in which I observe on.

Completable.concatArray(one, two, three, four)
                .observeOn(Schedulers.single()) // OFF UI THREAD
                .doOnSubscribe(__ -> {
                    Log.w(LOG_TAG, "Begin transaction. " + Thread.currentThread().toString());
                    mRoomDatabase.beginTransaction();
                })
                .doOnComplete(() -> {
                    Log.w(LOG_TAG, "Set transaction successful."  + Thread.currentThread().toString());
                    mRoomDatabase.setTransactionSuccessful();
                })
                .doFinally(() -> {
                    Log.w(LOG_TAG, "End transaction."  + Thread.currentThread().toString());
                    mRoomDatabase.endTransaction();
                })
                .subscribeOn(Schedulers.single())
                .observeOn(AndroidSchedulers.mainThread()) // ON UI THREAD
                .subscribeWith(new CompletableObserver() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.w(LOG_TAG, "onSubscribe."  + Thread.currentThread().toString());
                    }

                    @Override
                    public void onComplete() {
                        Log.w(LOG_TAG, "onComplete."  + Thread.currentThread().toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(LOG_TAG, "onError." + Thread.currentThread().toString());
                    }
                });

The logging statements now look like this:

W/MyPresenter: onSubscribe.Thread[main,5,main]
W/MyPresenter: Begin transaction. Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: (1)
W/MyPresenter: (2)
W/MyPresenter: (3)
W/MyPresenter: (4)
W/MyPresenter: Set transaction successful.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: End transaction.Thread[RxSingleScheduler-1,5,main]
W/MyPresenter: onComplete.Thread[main,5,main]

I believe this accomplishes what I'm after but it remains to be seen if the step-by-step assembly of Room-based RxJava Completables will work out. I will keep an eye out for any comments/answers and may report back for posterity.

like image 164
Steve Avatar answered Sep 30 '22 12:09

Steve


I managed to do transactions allowing operations from different tables this way (example with Dagger to inject database):

class RxRoomTransaction @Inject constructor(private val db : AppDatabase) {

    fun run(fn : () -> Unit) : Completable {
        return Completable.fromAction {
            try {
                db.beginTransaction()
                fn.invoke()
                db.setTransactionSuccessful()
            } catch (t : Throwable) {
                // Catch everything, including InterruptedException which is invoked on dispose
            } finally {
                try {
                    // Double check to catch possible exception caused by endTransaction (shouldn't occur)
                    db.endTransaction()
                } catch (t : Throwable) {
                }
            }
        }
    }

}

Calling it this way:

rxRoomTransaction.run {
    dao1.insertAll(data1)
    dao2.insert(data2)
    dao3.clear()
}

DAO methods are NOT returning RxJava objects:

@Dao
interface Dao3 {
    @Query("DELETE FROM table3")
    fun clear()
}
like image 43
Marek Teuchner Avatar answered Sep 30 '22 13:09

Marek Teuchner