I discovered that building a lot of Futures for one user request generally is a bad practice. These Futures can fill an execution context which will affect other requests. This is unlikely something you really want. Keeping Futures number small is quite simple - create new Futures only in for-comprehensions, using flatMap, etc. But sometimes it could be necessary to to create a Future for each Seq item. Using Future.sequence or Future.traverse cause problem described above. So I ended up with this solution, which doesn't create Futures for each collection item simultaneously:
def ftraverse[A, B](xs: Seq[A])(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
if(xs.isEmpty) Future successful Seq.empty[B]
else f(xs.head) flatMap { fh => ftraverse(xs.tail)(f) map (r => fh +: r) }
}
I wonder, maybe I'm inventing a wheel and actually such function already exists somewhere in Scala's standart library? Also I would like to know, did you encountered described problem and how did you solve it? Maybe, if this is a well known issue with Futures, I should create a pull request in Future.scala so this function (or more generalized version of it) would be included in standard library?
UPD: More general version, whith limited parallelism:
def ftraverse[A, B](xs: Seq[A], chunkSize: Int, maxChunks: Int)(f: A => Future[B])(implicit ec: ExecutionContext): Future[Seq[B]] = {
val xss = xs.grouped(chunkSize).toList
val chunks = xss.take(maxChunks-1) :+ xss.drop(maxChunks-1).flatten
Future.sequence{ chunks.map(chunk => ftraverse(chunk)(f) ) } map { _.flatten }
}
No, there isn't anything like this in the standard library. Whether there should be or not, I can't say. I don't think it's very common to want to execute Future
s in a strict sequence. But when you want to, it's very easy to implement your own method to do so, as you have. I personally just keep a method in my own libraries for this purpose. It would however be convenient to have a way to do this with the standard library, though. If there were, it should be more generic.
It is actually very simple to modify the current traverse
to process Future
s sequentially, instead of in parallel. Here is the current version, which uses foldLeft
instead of recursion:
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
val fb = fn(a)
for (r <- fr; b <- fb) yield (r += b)
}.map(_.result())
The Future
s are created before the flatMap
by assigning val fb = fn(a)
(and thus executed before). All one needs to do is move fn(a)
inside the flatMap
to delay the creation of subsequent Future
s in the collection.
def traverseSeq[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]], executor: ExecutionContext): Future[M[B]] =
in.foldLeft(Future.successful(cbf(in))) { (fr, a) =>
for (r <- fr; b <- fn(a)) yield (r += b)
}.map(_.result())
Another way you can limit the impact of executing a large number of Future
s is by using a different ExecutionContext
for them. For example in a web app, I might keep one ExecutionContext
for database calls, one for calls to Amazon S3, and one for slow database calls.
A very simple implementation could use fixed thread pools:
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
val executorService = Executors.newFixedThreadPool(4)
val executionContext = ExecutionContext.fromExecutorService(executorService)
A large number of Future
s executing here would fill the ExecutionContext
, but it would prevent them from filling other contexts.
If you're using Akka, you can easily create ExecutionContext
s from configuration using Dispatchers within an ActorSystem
:
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
If you have an ActorSystem
called system
you could then access it via:
implicit val executionContext = system.dispatchers.lookup("my-dispatcher")
All of this depends on your use case. While I do separate my asynchronous computations into different contexts, there are times when I still want to traverse
sequentially to smooth out the usage of these contexts.
It seems that your problem is not related to the number of futures you create, but the fairness with which they're executed. Consider how callbacks on futures (map
, flatMap
, onComplete
, fold
, etc) are processed: they are placed in an executor's queue and are executed when the results of their parent futures are completed.
If all of your futures share the same executor (i.e., queue), they will indeed clobber each other as you say. A common way to solve this fairness issue is to use Akka actors. For each request, spin up a new actor (with its own queue) and have all actors of that type share an ExecutionContext
. You can limit the maximum number of messages an actor will execute before moving on to another actor sharing that ExecutionContext
using the throughput
configuration property.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With