I have a paginated resource and I want to consume it recursively with Monix. I want to have an Observable that is going to emit downloaded elements and recursively consume pages. Here is a simple example. It doesn't work of course. It emits first page, then first page + second page, then first + second + third. And I want it to emit first, then second, then third and so on.
object Main extends App {
sealed trait Event
case class Loaded(xs: Seq[String]) extends Event
// probably should just finish stream instead of this event
case object Done extends Event
// here is the problem
def consume(page: Int, size: Int):Observable[Event] = {
Observable.fromFuture(getPaginatedResource(page, size)).concatMap{ xs =>
if (xs.isEmpty) Observable.pure(Done)
else Observable.concat(Observable.pure(Loaded(xs)), consume(page + 1, size + 5))
}
}
def getPaginatedResource(page: Int, size: Int):Future[Seq[String]] = Future {
if (page * size > 100) Seq.empty
else 0 to size map (x => s"element $x")
}
consume(page = 0, size = 5).foreach(println)
}
Any ideas?
UPD
Sorry, it seems like it's working and I just have a bug size + 5
. So it seems like problem is solved, but if you see that I'm doing something wrong, please, tell me.
It's generally recommended to avoid recursion if possible whenever using Observable
. Since, its not easy to visualize and is generally more prone to error.
One idea would be to use scanEvalF
since it will emit items on each step.
sealed trait Event
object Event {
case class Loaded(page: Int, size: Int, items: Seq[String]) extends Event
}
def getPaginatedResource(page: Int, size: Int): Task[Loaded] = Task.pure {
if (page * size > 100) Loaded(page, size, Seq.empty)
else Loaded(page, size, 0.to(size).map(x => s"element $x"))
}
def consume(page: Int, size: Int): Observable[Event] = {
Observable
.interval(0.seconds)
.scanEvalF(getPaginatedResource(page, size)) { (xs, _) =>
getPaginatedResource(xs.page + 1, xs.size + 5)
} // will emit items on each step
.takeWhileInclusive(_.items.nonEmpty) // only take until list is empty
}
consume(0, 5)
.foreachL(println)
.runToFuture
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With