I need to create an akka.stream.scaladsl.Source[T, Unit]
from a collection of Future[T]
.
E.g., having a collection of futures returning integers,
val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)
how to create a
val source: Source[Int, Unit] = ???
from it.
I cannot use Future.sequence
combinator, since then I would wait for each future to complete before getting anything from the source. I want to get results in any order as soon as any future completes.
I understand that Source
is a purely functional API and it should not run anything before somehow materializing it. So, my idea is to use an Iterator
(which is lazy) to create a source:
Source { () =>
new Iterator[Future[Int]] {
override def hasNext: Boolean = ???
override def next(): Future[Int] = ???
}
}
But that would be a source of futures, not of actual values. I could also block on next
using Await.result(future)
but I'm not sure which tread pool's thread will be blocked. Also this will call futures sequentially, while I need parallel execution.
UPDATE 2: it turned out there was a much easier way to do it (thanks to Viktor Klang):
Source(futures).mapAsync(1)(identity)
UPDATE: here is what I've got based on @sschaef answer:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
def run(actor: ActorRef): Unit = {
futures.foreach { future =>
future.onComplete {
case Success(value) =>
actor ! value
case Failure(NonFatal(t)) =>
actor ! Status.Failure(t) // to signal error
}
}
Future.sequence(futures).onSuccess { case _ =>
actor ! Status.Success(()) // to signal stream's end
}
}
Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}
// ScalaTest tests follow
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
"futuresToSource" should "convert futures collection to akka-stream source" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future(3)
whenReady {
futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
} { results =>
results should contain theSameElementsAs Seq(1, 2, 3)
}
}
it should "fail on future failure" in {
val f1 = Future(1)
val f2 = Future(2)
val f3 = Future.failed(new RuntimeException("future failed"))
whenReady {
futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
} { t =>
t shouldBe a [RuntimeException]
t should have message "future failed"
}
}
Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.
Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.
The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.
A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.
Creating a source of Futures and then "flatten" it via mapAsync:
scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With