How do I create a PagedList of an object for tests?


I have been working with the arch libraries from Google, but one thing that has made testing difficult is working with PagedList.

For this example, I am using the repository pattern and returning details from either an API or network.

So within the ViewModel I make a call to this interface method:

override fun getFoos(): Observable<PagedList<Foo>>

The Repository will then use RxPagedListBuilder to create the Observable that is of type PagedList:

 override fun getFoos(): Observable<PagedList<Foo>> =
            RxPagedListBuilder(database.fooDao().selectAll(), PAGED_LIST_CONFIG).buildObservable()

I want to be able for tests to setup the return from these methods that return a PagedList<Foo>. Something similar to


Two questions:

  1. Is this possible?
  2. How do I create a PagedList<Foo>?

My goal is to verify in a more end-to-end fashion (such as ensuring that the correct list of Foos is displayed on the screen). The fragment/activity/view is the one observing the PagedList<Foo> from a ViewModel.

1 Answers

Paging 3

The Paging 3 library offers a builder method PagingData.from(someList).

Paging 2

Convert List Into PagedList With a Mock DataSource.Factory.

@saied89 shared this solution in this googlesamples/android-architecture-components issue. I've implemented the mocked PagedList in the Coinverse Open App in order to local unit test a ViewModel using Kotlin, JUnit 5, MockK, and AssertJ libraries.

To observe the LiveData from the PagedList I've used Jose Alcérreca's implementation of getOrAwaitValue from the LiveDataSample sample app under Google's Android Architecture Components samples.

The asPagedList extension function is implemented in the sample test ContentViewModelTest.kt below.


    import android.database.Cursor
    import androidx.paging.DataSource
    import androidx.paging.LivePagedListBuilder
    import androidx.paging.PagedList
    import androidx.room.RoomDatabase
    import androidx.room.RoomSQLiteQuery
    import androidx.room.paging.LimitOffsetDataSource
    import io.mockk.every
    import io.mockk.mockk

    fun <T> List<T>.asPagedList() = LivePagedListBuilder<Int, T>(createMockDataSourceFactory(this),
        Config(enablePlaceholders = false,
                prefetchDistance = 24,
                pageSize = if (size == 0) 1 else size))

    private fun <T> createMockDataSourceFactory(itemList: List<T>): DataSource.Factory<Int, T> =
        object : DataSource.Factory<Int, T>() {
            override fun create(): DataSource<Int, T> = MockLimitDataSource(itemList)

    private val mockQuery = mockk<RoomSQLiteQuery> {
        every { sql } returns ""

    private val mockDb = mockk<RoomDatabase> {
        every { invalidationTracker } returns mockk(relaxUnitFun = true)

    class MockLimitDataSource<T>(private val itemList: List<T>) : LimitOffsetDataSource<T>(mockDb, mockQuery, false, null) {
        override fun convertRows(cursor: Cursor?): MutableList<T> = itemList.toMutableList()
        override fun countItems(): Int = itemList.count()
        override fun isInvalid(): Boolean = false
        override fun loadRange(params: LoadRangeParams, callback: LoadRangeCallback<T>) { /* Not implemented */ }

        override fun loadRange(startPosition: Int, loadCount: Int) =
            itemList.subList(startPosition, startPosition + loadCount).toMutableList()

        override fun loadInitial(params: LoadInitialParams, callback: LoadInitialCallback<T>) {
            callback.onResult(itemList, 0)


    import androidx.lifecycle.LiveData
    import androidx.lifecycle.Observer
    import java.util.concurrent.CountDownLatch
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.TimeoutException

     * Gets the value of a [LiveData] or waits for it to have one, with a timeout.
     * Use this extension from host-side (JVM) tests. It's recommended to use it alongside
     * `InstantTaskExecutorRule` or a similar mechanism to execute tasks synchronously.
    fun <T> LiveData<T>.getOrAwaitValue(
        time: Long = 2,
        timeUnit: TimeUnit = TimeUnit.SECONDS,
        afterObserve: () -> Unit = {}
    ): T {
        var data: T? = null
        val latch = CountDownLatch(1)
        val observer = object : Observer<T> {
            override fun onChanged(o: T?) {
                data = o
                [email protected](this)
        // Don't wait indefinitely if the LiveData is not set.
        if (!latch.await(time, timeUnit)) {
            throw TimeoutException("LiveData value was never set.")
        return data as T


    import androidx.paging.PagedList
    import com.google.firebase.Timestamp
    import io.mockk.*
    import org.assertj.core.api.Assertions.assertThat
    import org.junit.jupiter.api.AfterAll
    import org.junit.jupiter.api.BeforeAll
    import org.junit.jupiter.api.BeforeEach
    import org.junit.jupiter.api.Test
    import org.junit.jupiter.api.extension.ExtendWith

    class ContentViewModelTest {
        val timestamp = getTimeframe(DAY)

        fun beforeAll() {

        fun beforeEach() {

        fun afterAll() {

        fun `Feed Load`() {
            val content = Content("85", 0.0, Enums.ContentType.NONE, Timestamp.now(), "",
                "", "", "", "", "", "", MAIN,
                0, 0.0, 0.0, 0.0, 0.0,
                0.0, 0.0, 0.0, 0.0)
            every {
                getMainFeedList(any(), any())
            } returns liveData { 
                        pagedList = liveData {emit(listOf(content).asPagedList())}, 
                        errorMessage = ""))
            val contentViewModel = ContentViewModel(ContentRepository)
            contentViewModel.processEvent(ContentViewEvent.FeedLoad(MAIN, DAY, timestamp, false))
                        visibility = GONE,
                        titleRes = app_name,
                        isSupportActionBarEnabled = false))
            verify {
                getMainFeedList(any(), any())


This is required for JUnit 5 when using LiveData in order to ensure the Observer is not on the main thread. Below is Jeroen Mols' implementation.

    import androidx.arch.core.executor.ArchTaskExecutor
    import androidx.arch.core.executor.TaskExecutor
    import org.junit.jupiter.api.extension.AfterEachCallback
    import org.junit.jupiter.api.extension.BeforeEachCallback
    import org.junit.jupiter.api.extension.ExtensionContext

    class InstantExecutorExtension : BeforeEachCallback, AfterEachCallback {
        override fun beforeEach(context: ExtensionContext?) {
            ArchTaskExecutor.getInstance().setDelegate(object : TaskExecutor() {
                override fun executeOnDiskIO(runnable: Runnable) = runnable.run()
                override fun postToMainThread(runnable: Runnable) = runnable.run()
                override fun isMainThread(): Boolean = true

        override fun afterEach(context: ExtensionContext?) {
