Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I consume paginated resource using Monix in Scala?

Tags:

scala

monix

I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first page, then first page + second page, then first + second + third. And I want it to emit first, then second, then third and so on.

object Main extends App {

  sealed trait Event
  case class Loaded(xs: Seq[String]) extends Event
  // probably should just finish stream instead of this event
  case object Done extends Event

  // here is the problem
  def consume(page: Int, size: Int):Observable[Event] = {
    Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
      if (xs.isEmpty) Observable.pure(Done)
      else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
    }
  }

  def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
    if (page * size > 100) Seq.empty
    else 0 to size map (x => s"element $x")
  }

  consume(page = 0, size = 5).foreach(println)

}

Any ideas?

UPD Sorry, it seems like it's working and I just have a bug size + 5. So it seems like problem is solved, but if you see that I'm doing something wrong, please, tell me.

like image 983
Artem Malinko Avatar asked Nov 08 '22 19:11

Artem Malinko


1 Answers

It's generally recommended to avoid recursion if possible whenever using Observable. Since, its not easy to visualize and is generally more prone to error.

One idea would be to use scanEvalF since it will emit items on each step.

sealed trait Event
object Event {
  case class Loaded(page: Int, size: Int, items: Seq[String]) extends Event
}

def getPaginatedResource(page: Int, size: Int): Task[Loaded] = Task.pure {
  if (page * size > 100) Loaded(page, size, Seq.empty)
  else Loaded(page, size, 0.to(size).map(x => s"element $x"))
}

def consume(page: Int, size: Int): Observable[Event] = {
  Observable
    .interval(0.seconds)
    .scanEvalF(getPaginatedResource(page, size)) { (xs, _) =>
      getPaginatedResource(xs.page + 1, xs.size + 5)
    } // will emit items on each step
    .takeWhileInclusive(_.items.nonEmpty) // only take until list is empty
}

consume(0, 5)
  .foreachL(println)
  .runToFuture
like image 136
atl Avatar answered Nov 14 '22 21:11

atl