problem statement: I have a portfolio of securities that need to be processed in a parallel fashion. In Java i used a threadpool to process each security, and use a latch to countdown. Once complete I do some merging etc.
So I message my SecurityProcessor(which is an actor), and wait on all the futures to complete. In the end I use a MergeHelper to do the post-processing. The SecurityProcessor takes a security, does some i/o and processing and replies a Security
val listOfFutures = new ListBuffer[Future[Security]]()
var portfolioResponse: Portfolio = _
for (security <- portfolio.getSecurities.toList) {
val securityProcessor = actorOf[SecurityProcessor].start()
listOfFutures += (securityProcessor ? security) map {
_.asInstanceOf[Security]
}
}
val futures = Future.sequence(listOfFutures.toList)
futures.map {
listOfSecurities =>
portfolioResponse = MergeHelper.merge(portfolio, listOfSecurities)
}.get
Is this design correct, and is there a better/cooler way to implement this common problem using akka?
In parallel computing, the fork–join model is a way of setting up and executing parallel programs, such that execution branches off in parallel at designated points in the program, to "join" (merge) at a subsequent point and resume sequential execution.
The fork/join framework was designed to speed up the execution of tasks that can be divided into other smaller subtasks, executing them in parallel and then combining their results to get a single one.
ForkJoinPool It is an implementation of the ExecutorService that manages worker threads and provides us with tools to get information about the thread pool state and performance. Worker threads can execute only one task at a time, but the ForkJoinPool doesn't create a separate thread for every single subtask.
In practice ExecutorService is usually used to process many independent requests (aka transaction) concurrently, and fork-join when you want to accelerate one coherent job. +1 Fork-Join solves a specific type of problem. If you don't have this type of problem, use ExecutorService as this is what Fork-Join uses anyway.
val futureResult = Future.sequence(
portfolio.getSecurities.toList map { security => (actorOf[SecurityProcessor].start() ? security).mapTo[Security] }
) map { securities => MergeHelper.merge(portfolio, securities) }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With