Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

In akka-stream how to create a unordered Source from a futures collection

I need to create an akka.stream.scaladsl.Source[T, Unit] from a collection of Future[T].

E.g., having a collection of futures returning integers,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)

how to create a

val source: Source[Int, Unit] = ???

from it.

I cannot use Future.sequence combinator, since then I would wait for each future to complete before getting anything from the source. I want to get results in any order as soon as any future completes.

I understand that Source is a purely functional API and it should not run anything before somehow materializing it. So, my idea is to use an Iterator (which is lazy) to create a source:

Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}

But that would be a source of futures, not of actual values. I could also block on next using Await.result(future) but I'm not sure which tread pool's thread will be blocked. Also this will call futures sequentially, while I need parallel execution.

UPDATE 2: it turned out there was a much easier way to do it (thanks to Viktor Klang):

Source(futures).mapAsync(1)(identity)

UPDATE: here is what I've got based on @sschaef answer:

def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
  def run(actor: ActorRef): Unit = {
    futures.foreach { future =>
      future.onComplete {
        case Success(value) =>
          actor ! value
        case Failure(NonFatal(t)) =>
          actor ! Status.Failure(t) // to signal error
      }
    }

    Future.sequence(futures).onSuccess { case _ =>
      actor ! Status.Success(()) // to signal stream's end
    }
  }

  Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}

// ScalaTest tests follow

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

"futuresToSource" should "convert futures collection to akka-stream source" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future(3)

  whenReady {
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
  } { results =>
    results should contain theSameElementsAs Seq(1, 2, 3)
  }
}

it should "fail on future failure" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future.failed(new RuntimeException("future failed"))

  whenReady {
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
  } { t =>
    t shouldBe a [RuntimeException]
    t should have message "future failed"
  }
}
like image 273
Tvaroh Avatar asked Sep 06 '15 13:09

Tvaroh


People also ask

Can you describe what are 3 main components of Akka streams?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness and it is the defining feature of Akka Streams. Akka streams consist of three major components in it – Source, Flow and Sink.

What is actor Materializer?

Actor Materializer Lifecycle. The Materializer is a component that is responsible for turning the stream blueprint into a running stream and emitting the “materialized value”.

What is materialized value in Akka stream?

The Akka Streams library calls them materialized values. That's because, when you plug components together, you have an inert graph, but when you call the run method, the graph comes alive, or is materialized. The Jedi value returned by materializing a graph is called a materialized value.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.


1 Answers

Creating a source of Futures and then "flatten" it via mapAsync:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804
like image 183
Viktor Klang Avatar answered Oct 13 '22 03:10

Viktor Klang