Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asynchronous IO in Scala with futures

Let's say I'm getting a (potentially big) list of images to download from some URLs. I'm using Scala, so what I would do is :

import scala.actors.Futures._  // Retrieve URLs from somewhere val urls: List[String] = ...  // Download image (blocking operation) val fimages: List[Future[...]] = urls.map (url => future { download url })  // Do something (display) when complete fimages.foreach (_.foreach (display _)) 

I'm a bit new to Scala, so this still looks a little like magic to me :

  • Is this the right way to do it? Any alternatives if it is not?
  • If I have 100 images to download, will this create 100 threads at once, or will it use a thread pool?
  • Will the last instruction (display _) be executed on the main thread, and if not, how can I make sure it is?

Thanks for your advice!

like image 667
F.X. Avatar asked Oct 27 '12 06:10

F.X.


People also ask

How do you handle Future Scala?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

Does Scala support asynchronous programming?

This course enables developers to write non-blocking and asynchronous code using Scala. You'll learn how to use Scala Futures for writing async code. It covers how actor model can be leveraged to write an asynchronous and message based application.

Is Future blocking onComplete?

NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.

Is Scala asynchronous?

Scala will not magically make synchronous code asynchronous, but it will provide you all you need to write fully a asynchronous one (Futures/Promises, Actors, Streams, ...). Being asynchronous does not mean being single threaded, it means you keep doing useful thinks while waiting for replies.


2 Answers

Use Futures in Scala 2.10. They were joint work between the Scala team, the Akka team, and Twitter to reach a more standardized future API and implementation for use across frameworks. We just published a guide at: http://docs.scala-lang.org/overviews/core/futures.html

Beyond being completely non-blocking (by default, though we provide the ability to do managed blocking operations) and composable, Scala's 2.10 futures come with an implicit thread pool to execute your tasks on, as well as some utilities to manage time outs.

import scala.concurrent.{future, blocking, Future, Await, ExecutionContext.Implicits.global} import scala.concurrent.duration._  // Retrieve URLs from somewhere val urls: List[String] = ...  // Download image (blocking operation) val imagesFuts: List[Future[...]] = urls.map {   url => future { blocking { download url } } }  // Do something (display) when complete val futImages: Future[List[...]] = Future.sequence(imagesFuts) Await.result(futImages, 10 seconds).foreach(display) 

Above, we first import a number of things:

  • future: API for creating a future.
  • blocking: API for managed blocking.
  • Future: Future companion object which contains a number of useful methods for collections of futures.
  • Await: singleton object used for blocking on a future (transferring its result to the current thread).
  • ExecutionContext.Implicits.global: the default global thread pool, a ForkJoin pool.
  • duration._: utilities for managing durations for time outs.

imagesFuts remains largely the same as what you originally did- the only difference here is that we use managed blocking- blocking. It notifies the thread pool that the block of code you pass to it contains long-running or blocking operations. This allows the pool to temporarily spawn new workers to make sure that it never happens that all of the workers are blocked. This is done to prevent starvation (locking up the thread pool) in blocking applications. Note that the thread pool also knows when the code in a managed blocking block is complete- so it will remove the spare worker thread at that point, which means that the pool will shrink back down to its expected size.

(If you want to absolutely prevent additional threads from ever being created, then you ought to use an AsyncIO library, such as Java's NIO library.)

Then we use the collection methods of the Future companion object to convert imagesFuts from List[Future[...]] to a Future[List[...]].

The Await object is how we can ensure that display is executed on the calling thread-- Await.result simply forces the current thread to wait until the future that it is passed is completed. (This uses managed blocking internally.)

like image 83
Heather Miller Avatar answered Oct 20 '22 05:10

Heather Miller


val all = Future.traverse(urls){ url =>   val f = future(download url) /*(downloadContext)*/   f.onComplete(display)(displayContext)   f } Await.result(all, ...) 
  1. Use scala.concurrent.Future in 2.10, which is RC now.
  2. which uses an implicit ExecutionContext
  3. The new Future doc is explicit that onComplete (and foreach) may evaluate immediately if the value is available. The old actors Future does the same thing. Depending on what your requirement is for display, you can supply a suitable ExecutionContext (for instance, a single thread executor). If you just want the main thread to wait for loading to complete, traverse gives you a future to await on.
like image 45
som-snytt Avatar answered Oct 20 '22 05:10

som-snytt