Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

fork and join using Akka

problem statement: I have a portfolio of securities that need to be processed in a parallel fashion. In Java i used a threadpool to process each security, and use a latch to countdown. Once complete I do some merging etc.

So I message my SecurityProcessor(which is an actor), and wait on all the futures to complete. In the end I use a MergeHelper to do the post-processing. The SecurityProcessor takes a security, does some i/o and processing and replies a Security

  val listOfFutures = new ListBuffer[Future[Security]]()
  var portfolioResponse: Portfolio = _
  for (security <- portfolio.getSecurities.toList) {
    val securityProcessor = actorOf[SecurityProcessor].start()
    listOfFutures += (securityProcessor ? security) map {
      _.asInstanceOf[Security]
    }
  }
  val futures = Future.sequence(listOfFutures.toList)
  futures.map {
    listOfSecurities =>
      portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
  }.get

Is this design correct, and is there a better/cooler way to implement this common problem using akka?

like image 434
Debajyoti Roy Avatar asked Nov 07 '11 16:11

Debajyoti Roy


People also ask

How does fork and join work?

In parallel computing, the fork–join model is a way of setting up and executing parallel programs, such that execution branches off in parallel at designated points in the program, to "join" (merge) at a subsequent point and resume sequential execution.

When we should use fork and join?

The fork/join framework was designed to speed up the execution of tasks that can be divided into other smaller subtasks, executing them in parallel and then combining their results to get a single one.

What is a fork join pool?

ForkJoinPool It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance. Worker threads can execute only one task at a time, but the ForkJoinPool doesn't create a separate thread for every single subtask.

How is fork join different than executor service?

In practice ExecutorService is usually used to process many independent requests (aka transaction) concurrently, and fork-join when you want to accelerate one coherent job. +1 Fork-Join solves a specific type of problem. If you don't have this type of problem, use ExecutorService as this is what Fork-Join uses anyway.


1 Answers

val futureResult = Future.sequence(
                  portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
                ) map { securities => MergeHelper.merge(portfolio, securities) }
like image 102
Viktor Klang Avatar answered Oct 06 '22 01:10

Viktor Klang