Let's say I have some iterator:
val nextElemIter: Iterator[Future[Int]] = Iterator.continually(...)
And I want to build a source from that iterator:
val source: Source[Future[Int], NotUsed] =
Source.fromIterator(() => nextElemIter)
So now my source emits Future
s. I have never seen futures being passed between stages in Akka docs or anywhere else, so instead, I could always do something like this:
val source: Source[Int, NotUsed] =
Source.fromIterator(() => nextElemIter).mapAsync(1)(identity /* was n => n */)
And now I have a regular source that emits T
instead of Future[T]
. But this feels hacky and wrong.
What's the proper way to deal with such situations?
Answering your question directly: I agree with Vladimir's comment that there is nothing "hacky" about using mapAsync
for the purpose you described. I can't think of any more direct way to unwrap the Future
from around your underlying Int
values.
Answering your question indirectly...
Try to stick with Futures
Streams, as a concurrency mechanism, are incredibly useful when backpressure is required. However, pure Future
operations have their place in applications as well.
If your Iterator[Future[Int]]
is going to produce a known, limited, number of Future
values then you may want to stick with using the Futures for concurrency.
Imagine you want to filter, map, & reduce the Int
values.
def isGoodInt(i : Int) : Boolean = ??? //filter
def transformInt(i : Int) : Int = ??? //map
def combineInts(i : Int, j : Int) : Int = ??? //reduce
Futures provide a direct way of using these functions:
val finalVal : Future[Int] =
Future sequence {
for {
f <- nextElemIter.toSeq //launch all of the Futures
i <- f if isGoodInt(i)
} yield transformInt(i)
} map (_ reduce combineInts)
Compared with a somewhat indirect way of using the Stream as you suggested:
val finalVal : Future[Int] =
Source.fromIterator(() => nextElemIter)
.via(Flow[Future[Int]].mapAsync(1)(identity))
.via(Flow[Int].filter(isGoodInt))
.via(Flow[Int].map(transformInt))
.to(Sink.reduce(combineInts))
.run()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With