Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a build in "slow" Future.traverse version?

Tags:

scala

future

I discovered that building a lot of Futures for one user request generally is a bad practice. These Futures can fill an execution context which will affect other requests. This is unlikely something you really want. Keeping Futures number small is quite simple - create new Futures only in for-comprehensions, using flatMap, etc. But sometimes it could be necessary to to create a Future for each Seq item. Using Future.sequence or Future.traverse cause problem described above. So I ended up with this solution, which doesn't create Futures for each collection item simultaneously:

  def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    if(xs.isEmpty) Future successful Seq.empty[B]
    else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
  }

I wonder, maybe I'm inventing a wheel and actually such function already exists somewhere in Scala's standart library? Also I would like to know, did you encountered described problem and how did you solve it? Maybe, if this is a well known issue with Futures, I should create a pull request in Future.scala so this function (or more generalized version of it) would be included in standard library?

UPD: More general version, whith limited parallelism:

  def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
    val xss = xs.grouped(chunkSize).toList
    val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
    Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
  } 
like image 907
Aleksander Alekseev Avatar asked Feb 14 '15 10:02

Aleksander Alekseev


2 Answers

No, there isn't anything like this in the standard library. Whether there should be or not, I can't say. I don't think it's very common to want to execute Futures in a strict sequence. But when you want to, it's very easy to implement your own method to do so, as you have. I personally just keep a method in my own libraries for this purpose. It would however be convenient to have a way to do this with the standard library, though. If there were, it should be more generic.

It is actually very simple to modify the current traverse to process Futures sequentially, instead of in parallel. Here is the current version, which uses foldLeft instead of recursion:

def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      val fb = fn(a)
      for (r <- fr; b <- fb) yield (r += b)
    }.map(_.result())

The Futures are created before the flatMap by assigning val fb = fn(a) (and thus executed before). All one needs to do is move fn(a) inside the flatMap to delay the creation of subsequent Futures in the collection.

def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
    in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
      for (r <- fr; b <- fn(a)) yield (r += b)
    }.map(_.result())

Another way you can limit the impact of executing a large number of Futures is by using a different ExecutionContext for them. For example in a web app, I might keep one ExecutionContext for database calls, one for calls to Amazon S3, and one for slow database calls.

A very simple implementation could use fixed thread pools:

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)

A large number of Futures executing here would fill the ExecutionContext, but it would prevent them from filling other contexts.

If you're using Akka, you can easily create ExecutionContexts from configuration using Dispatchers within an ActorSystem:

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 2
    parallelism-factor = 2.0
    parallelism-max = 10
  }
  throughput = 100
}

If you have an ActorSystem called system you could then access it via:

implicit val executionContext = system.dispatchers.lookup("my-dispatcher")

All of this depends on your use case. While I do separate my asynchronous computations into different contexts, there are times when I still want to traverse sequentially to smooth out the usage of these contexts.

like image 140
Michael Zajac Avatar answered Oct 12 '22 14:10

Michael Zajac


It seems that your problem is not related to the number of futures you create, but the fairness with which they're executed. Consider how callbacks on futures (map, flatMap, onComplete, fold, etc) are processed: they are placed in an executor's queue and are executed when the results of their parent futures are completed.

If all of your futures share the same executor (i.e., queue), they will indeed clobber each other as you say. A common way to solve this fairness issue is to use Akka actors. For each request, spin up a new actor (with its own queue) and have all actors of that type share an ExecutionContext. You can limit the maximum number of messages an actor will execute before moving on to another actor sharing that ExecutionContext using the throughput configuration property.

like image 21
Zack Angelo Avatar answered Oct 12 '22 12:10

Zack Angelo