Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to deal with source that emits Future[T]?

Let's say I have some iterator:

val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...)


And I want to build a source from that iterator:

val source: Source[Future[Int], NotUsed] =
  Source.fromIterator(() => nextElemIter)


So now my source emits Futures. I have never seen futures being passed between stages in Akka docs or anywhere else, so instead, I could always do something like this:

val source: Source[Int, NotUsed] = 
  Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */)


And now I have a regular source that emits T instead of Future[T]. But this feels hacky and wrong.

What's the proper way to deal with such situations?

like image 771
Ori Popowski Avatar asked Sep 28 '16 09:09

Ori Popowski


1 Answers

Answering your question directly: I agree with Vladimir's comment that there is nothing "hacky" about using mapAsync for the purpose you described. I can't think of any more direct way to unwrap the Future from around your underlying Int values.

Answering your question indirectly...

Try to stick with Futures

Streams, as a concurrency mechanism, are incredibly useful when backpressure is required. However, pure Future operations have their place in applications as well.

If your Iterator[Future[Int]] is going to produce a known, limited, number of Future values then you may want to stick with using the Futures for concurrency.

Imagine you want to filter, map, & reduce the Int values.

def isGoodInt(i : Int) : Boolean = ???         //filter
def transformInt(i : Int) : Int = ???          //map
def combineInts(i : Int, j : Int) : Int = ???  //reduce

Futures provide a direct way of using these functions:

val finalVal : Future[Int] = 
  Future sequence { 
    for {
      f <- nextElemIter.toSeq  //launch all of the Futures
      i <- f if isGoodInt(i)
    } yield transformInt(i)
  } map (_ reduce combineInts)

Compared with a somewhat indirect way of using the Stream as you suggested:

val finalVal : Future[Int] = 
  Source.fromIterator(() => nextElemIter)
        .via(Flow[Future[Int]].mapAsync(1)(identity))
        .via(Flow[Int].filter(isGoodInt))
        .via(Flow[Int].map(transformInt))
        .to(Sink.reduce(combineInts))
        .run()
like image 71
Ramón J Romero y Vigil Avatar answered Sep 20 '22 03:09

Ramón J Romero y Vigil