Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I rewrite a for loop with a shared dependency using actors

Tags:

scala

akka

actor

We have some code which needs to run faster. Its already profiled so we would like to make use of multiple threads. Usually I would setup an in memory queue, and have a number of threads taking jobs of the queue and calculating the results. For the shared data I would use a ConcurrentHashMap or similar.

I don't really want to go down that route again. From what I have read using actors will result in cleaner code and if I use akka migrating to more than 1 jvm should be easier. Is that true?

However, I don't know how to think in actors so I am not sure where to start.

To give a better idea of the problem here is some sample code:

case class Trade(price:Double, volume:Int, stock:String) {
  def value(priceCalculator:PriceCalculator) =
    (priceCalculator.priceFor(stock)-> price)*volume
}
class PriceCalculator {
  def priceFor(stock:String) = {
    Thread.sleep(20)//a slow operation which can be cached
    50.0
  }
}
object ValueTrades {

  def valueAll(trades:List[Trade],
      priceCalculator:PriceCalculator):List[(Trade,Double)] = {
    trades.map { trade => (trade,trade.value(priceCalculator)) }
  }

  def main(args:Array[String]) {
    val trades = List(
      Trade(30.5, 10, "Foo"),
      Trade(30.5, 20, "Foo")
      //usually much longer
    )
    val priceCalculator = new PriceCalculator
    val values = valueAll(trades, priceCalculator)
  }

}

I'd appreciate it if someone with experience using actors could suggest how this would map on to actors.

like image 398
Thomas Rynne Avatar asked Mar 22 '10 19:03

Thomas Rynne


People also ask

How can I send a message to an actor in Akka?

1) Akka Actor tell() Method It works on "fire-forget" approach. You can also use ! (bang) exclamation mark to send message. This is the preferred way of sending messages.

What is an Akka actor?

What is an Actor in Akka? An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.

What is an actor in Scala?

In Scala, an actor's behavior is defined by implementing the act method. Logically, an actor is a concurrent process which executes the body of its act method, and then terminates. In Akka, the behavior is defined by using a global message handler which processes the messages in the actor's mailbox one by one.

How do you stop Akka actor?

In Akka, you can stop Actors by invoking the stop() method of either ActorContext or ActorSystem class. ActorContext is used to stop child actor and ActorSystem is used to stop top level Actor. The actual termination of the actor is performed asynchronously.


2 Answers

This is a complement to my comment on shared results for expensive calculations. Here it is:

import scala.actors._
import Actor._
import Futures._

case class PriceFor(stock: String) // Ask for result

// The following could be an "object" as well, if it's supposed to be singleton
class PriceCalculator extends Actor {
  val map = new scala.collection.mutable.HashMap[String, Future[Double]]()
  def act = loop {
    react {
      case PriceFor(stock) => reply(map getOrElseUpdate (stock, future {
        Thread.sleep(2000) // a slow operation
        50.0
      }))
    }
  }
}

Here's an usage example:

scala> val pc = new PriceCalculator; pc.start
pc: PriceCalculator = PriceCalculator@141fe06

scala> class Test(stock: String) extends Actor {
     |   def act = {
     |     println(System.currentTimeMillis().toString+": Asking for stock "+stock)
     |     val f = (pc !? PriceFor(stock)).asInstanceOf[Future[Double]]
     |     println(System.currentTimeMillis().toString+": Got the future back")
     |     val res = f.apply() // this blocks until the result is ready
     |     println(System.currentTimeMillis().toString+": Value: "+res)
     |   }
     | }
defined class Test

scala> List("abc", "def", "abc").map(new Test(_)).map(_.start)
1269310737461: Asking for stock abc
res37: List[scala.actors.Actor] = List(Test@6d888e, Test@1203c7f, Test@163d118)
1269310737461: Asking for stock abc
1269310737461: Asking for stock def
1269310737464: Got the future back

scala> 1269310737462: Got the future back
1269310737465: Got the future back
1269310739462: Value: 50.0
1269310739462: Value: 50.0
1269310739465: Value: 50.0


scala> new Test("abc").start // Should return instantly
1269310755364: Asking for stock abc
res38: scala.actors.Actor = Test@15b5b68
1269310755365: Got the future back

scala> 1269310755367: Value: 50.0
like image 148
Daniel C. Sobral Avatar answered Oct 05 '22 13:10

Daniel C. Sobral


For simple parallelization, where I throw a bunch of work out to process and then wait for it all to come back, I tend to like to use a Futures pattern.

class ActorExample {
  import actors._
  import Actor._
  class Worker(val id: Int) extends Actor {
    def busywork(i0: Int, i1: Int) = {
      var sum,i = i0
      while (i < i1) {
        i += 1
        sum += 42*i
      }
      sum
    }
    def act() { loop { react {
      case (i0:Int,i1:Int) => sender ! busywork(i0,i1)
      case None => exit()
    }}}
  }

  val workforce = (1 to 4).map(i => new Worker(i)).toList

  def parallelFourSums = {
    workforce.foreach(_.start())
    val futures = workforce.map(w => w !! ((w.id,1000000000)) );
    val computed = futures.map(f => f() match {
      case i:Int => i
      case _ => throw new IllegalArgumentException("I wanted an int!")
    })
    workforce.foreach(_ ! None)
    computed
  }

  def serialFourSums = {
    val solo = workforce.head
    workforce.map(w => solo.busywork(w.id,1000000000))
  }

  def timed(f: => List[Int]) = {
    val t0 = System.nanoTime
    val result = f
    val t1 = System.nanoTime
    (result, t1-t0)
  }

  def go {
    val serial = timed( serialFourSums )
    val parallel = timed( parallelFourSums )
    println("Serial result:  " + serial._1)
    println("Parallel result:" + parallel._1)
    printf("Serial took   %.3f seconds\n",serial._2*1e-9)
    printf("Parallel took %.3f seconds\n",parallel._2*1e-9)
  }
}

Basically, the idea is to create a collection of workers--one per workload--and then throw all the data at them with !! which immediately gives back a future. When you try to read the future, the sender blocks until the worker's actually done with the data.

You could rewrite the above so that PriceCalculator extended Actor instead, and valueAll coordinated the return of the data.

Note that you have to be careful passing non-immutable data around.

Anyway, on the machine I'm typing this from, if you run the above you get:

scala> (new ActorExample).go
Serial result:  List(-1629056553, -1629056636, -1629056761, -1629056928)
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928)
Serial took   1.532 seconds
Parallel took 0.443 seconds

(Obviously I have at least four cores; the parallel timing varies rather a bit depending on which worker gets what processor and what else is going on on the machine.)

like image 41
Rex Kerr Avatar answered Oct 05 '22 13:10

Rex Kerr