My question is simple, about Future.traverse method. So I have a list of String-s. Each string is an URL to a web page. Then I have a class which can take an URL, load the web page and parse some data. All this is wrapped into Future{} so the result is processed asynchronously.
The class is simplified looking like this:
class RatingRetriever(context:ExecutionContext) {
def resolveFilmToRating(url:String):Future[Option[Double]]={
Future{
//here it creates Selenium web driver, loads the url and parses it.
}(context)
}
}
Then in the other object I have this:
implicit val executionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(2))
.......
val links:List[String] = films.map(film => film.asInstanceOf[WebElement].getAttribute("href"))
val ratings: Future[List[Option[Double]]] = Future.traverse(links)(link => new RatingRetriever(executionContext).resolveFilmToRating(link))
When it works I can definitely see it goes through collection sequentially. If I change the execution context from fixed size pool to single thread pool the behavior is the same. So I really wonder how can I make the Future.traverse work in parallel. Can you advise?
sequence takes a list of futures and transforms it into a single future of list in an asynchronous manner. For instance, assume that you have a list of independent jobs to be run simultaneously. In such a case, the list of futures can be composed into a single future of list using Future. sequence.
sequence() function converts a list of Futures into a single Future that means collections of Futures into a single Future. In simple words, List[Future[T]] ======> Future[List[T]] . It is also known as composing Futures.
Future represents a result of an asynchronous computation that may or may not be available yet. When we create a new Future, Scala spawns a new thread and executes its code. Once the execution is finished, the result of the computation (value or exception) will be assigned to the Future.
NOTE: With Future. onComplete() we are no longer blocking for the result from the Future but instead we will receive a callback for either a Success or a Failure.
Take a look at traverse's sources:
in.foldLeft(successful(cbf(in))) { (fr, a) => //we sequentially traverse Collection
val fb = fn(a) //Your function comes here
for (r <- fr; b <- fb) yield (r += b) //Just add elem to builder
}.map(_.result()) //Getting the collection from builder
So how much parallel you code is depends on your function fn, take a look at two examples:
1) This code:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object FutureTraverse extends App{
def log(s: String) = println(s"${Thread.currentThread.getName}: $s")
def withDelay(i: Int) = Future{
log(s"withDelay($i)")
Thread.sleep(1000)
i
}
val seq = 0 to 10
Future {
for(i <- 0 to 5){
log(".")
Thread.sleep(1000)
}
}
val resultSeq = Future.traverse(seq)(withDelay(_))
Thread.sleep(6000)
}
Has such an output:
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(0)
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: withDelay(2)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(3)
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-3: withDelay(9)
ForkJoinPool-1-worker-1: withDelay(10)
ForkJoinPool-1-worker-5: .
ForkJoinPool-1-worker-5: .
2) Just change the withDelay function:
def withDelay(i: Int) = {
Thread.sleep(1000)
Future {
log(s"withDelay($i)")
i
}
}
and you'll get a sequential output:
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-5: withDelay(0)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(1)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(2)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(3)
ForkJoinPool-1-worker-7: .
ForkJoinPool-1-worker-1: withDelay(4)
ForkJoinPool-1-worker-7: withDelay(5)
ForkJoinPool-1-worker-1: withDelay(6)
ForkJoinPool-1-worker-1: withDelay(7)
ForkJoinPool-1-worker-7: withDelay(8)
ForkJoinPool-1-worker-7: withDelay(9)
ForkJoinPool-1-worker-7: withDelay(10)
So Future.traverse doesn't have to be a parallel one, it just submits tasks, it can do this sequentially, whole parallel thing is in your submitted function.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With