Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to limit number of unprocessed Futures in Scala?

I cannot fund if there is way to limit number of unprocessed Futures in Scala. For example in following code:

import ExecutionContext.Implicits.global    
for (i <- 1 to N) {
  val f = Future {
    //Some Work with bunch of object creation
  }
}

if N is too big, it will eventually throw OOM. Is there a way to limit number of unprocessed Futures ether with queue-like wait or with exception?

like image 336
yurybubnov Avatar asked Jun 11 '16 00:06

yurybubnov


People also ask

Does Scala have onComplete blocking?

onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure. As such, we've also had to import scala. util.

How do you handle Future Scala?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

What is ExecutionContext Scala?

An ExecutionContext is similar to an Executor: it is free to execute computations in a new thread, in a pooled thread or in the current thread (although executing the computation in the current thread is discouraged – more on that below). The scala.

What is await result in Scala?

Await. result tries to return the Future result as soon as possible and throws an exception if the Future fails with an exception while Await. ready returns the completed Future from which the result (Success or Failure) can safely be extracted.


1 Answers

So, the simplest answer is that you can create an ExecutionContext that blocks or throttles the execution of new tasks beyond a certain limit. See this blog post. For a more fleshed out example of a blocking Java ExecutorService, here is an example. [You can use it directly if you want, the library on Maven Central is here.] This wraps some nonblocking ExecutorService, which you can create using the factory methods of java.util.concurrent.Executors.

To convert a Java ExecutorService into a Scala ExecutionContext is just ExecutionContext.fromExecutorService( executorService ). So, using the library linked above, you might have code like...

import java.util.concurrent.{ExecutionContext,Executors}
import com.mchange.v3.concurrent.BoundedExecutorService

val executorService = new BoundedExecutorService(
  Executors.newFixedThreadPool( 10 ), // a pool of ten Threads
  100,                                // block new tasks when 100 are in process
  50                                  // restart accepting tasks when the number of in-process tasks falls below 50
 )

implicit val executionContext = ExecutionContext.fromExecutorService( executorService )

// do stuff that creates lots of futures here...

That's fine if you want a bounded ExecutorService that will last as long as your whole application. But if you are creating lots of futures in a localized point in your code, and you will want to shut down the ExecutorService when you are done with it. I define loan-pattern methods in Scala [maven central] that both create the context and shut it down after I'm done. The code ends up looking like...

import com.mchange.sc.v2.concurrent.ExecutionContexts

ExecutionContexts.withBoundedFixedThreadPool( size = 10, blockBound = 100, restartBeneath = 50 ) { implicit executionContext =>
    // do stuff that creates lots of futures here...

    // make sure the Futures have completed before the scope ends!
    // that's important! otherwise, some Futures will never get to run
}

Rather than using an ExecutorService, that blocks outright, you can use an instance that slows things down by forcing the task-scheduling (Future-creating) Thread to execute the task rather than running it asynchronously. You'd make a java.util.concurrent.ThreadPoolExecutor using ThreadPoolExecutor.CallerRunsPolicy. But ThreadPoolExecutor is fairly complex to build directly.

A newer, sexier, more Scala-centric alternative to all of this would be to check out Akka Streams as an alternative to Future for concurrent execution with "back-pressure" to prevent OutOfMemoryErrors.

like image 125
Steve Waldman Avatar answered Sep 27 '22 16:09

Steve Waldman