Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Future.traverse seems to work sequentially and not in parallel. Is this true?

Tags:

scala

My question is simple, about Future.traverse method. So I have a list of String-s. Each string is an URL to a web page. Then I have a class which can take an URL, load the web page and parse some data. All this is wrapped into Future{} so the result is processed asynchronously.

The class is simplified looking like this:

class RatingRetriever(context:ExecutionContext) {
  def resolveFilmToRating(url:String):Future[Option[Double]]={
    Future{
      //here it creates Selenium web driver, loads the url and parses it.
    }(context)
  }
}

Then in the other object I have this:

    implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
    .......
    val links:List[String] = films.map(film => film.asInstanceOf[WebElement].getAttribute("href"))
    val ratings: Future[List[Option[Double]]] = Future.traverse(links)(link => new RatingRetriever(executionContext).resolveFilmToRating(link))

When it works I can definitely see it goes through collection sequentially. If I change the execution context from fixed size pool to single thread pool the behavior is the same. So I really wonder how can I make the Future.traverse work in parallel. Can you advise?

like image 990
Alexander Arendar Avatar asked Jan 09 '16 13:01

Alexander Arendar


People also ask

What is future sequence?

sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.

How do you use the future sequence in Scala?

sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.

How scala future works?

Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.

Is future blocking onComplete?

NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.


1 Answers

Take a look at traverse's sources:

in.foldLeft(successful(cbf(in))) { (fr, a) => //we sequentially traverse Collection
  val fb = fn(a)                        //Your function comes here
  for (r <- fr; b <- fb) yield (r += b) //Just add elem to builder
}.map(_.result())                       //Getting the collection from builder

So how much parallel you code is depends on your function fn, take a look at two examples:

1) This code:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object FutureTraverse extends App{
  def log(s: String) = println(s"${Thread.currentThread.getName}: $s")

  def withDelay(i: Int) = Future{
    log(s"withDelay($i)")
    Thread.sleep(1000)
    i
  }

  val seq = 0 to 10

  Future {
    for(i <- 0 to 5){
      log(".")
      Thread.sleep(1000)
    }
  }

  val resultSeq = Future.traverse(seq)(withDelay(_))

  Thread.sleep(6000)
}

Has such an output:

ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(0)
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: withDelay(2)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(3)
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(9)
ForkJoinPool-1-worker-1: withDelay(10)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-5: .

2) Just change the withDelay function:

  def withDelay(i: Int) = {
    Thread.sleep(1000)
    Future {
      log(s"withDelay($i)")
      i
    }
  }

and you'll get a sequential output:

ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-5: withDelay(0)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(2)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(3)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-1: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-7: withDelay(9)
ForkJoinPool-1-worker-7: withDelay(10)

So Future.traverse doesn't have to be a parallel one, it just submits tasks, it can do this sequentially, whole parallel thing is in your submitted function.

like image 138
nikiforo Avatar answered Sep 24 '22 20:09

nikiforo