Suppose I have to ехеcute several CPU-bound tasks. If I have 4 CPUs, for example, I would probably create a fixed-size thread pool of 4-5 worker threads waiting on a queue and put the tasks in the queue. In Java I can use java.util.concurrent
(maybe ThreadPoolExecutor
) to implement this mechanism.
How would you implement it with Scala actors?
All actors are basically threads which are executed by a scheduler under the hood. The scheduler creates a thread pool to execute actors roughly bound to your number of cores. This means that you can just create an actor per task you need to execute and leave the rest to Scala:
for(i <- 1 to 20) {
actor {
print(i);
Thread.sleep(1000);
}
}
The disadvantage here is depending on the number of tasks, the cost of creating a thread for each task may be quite expensive since threads are not so cheap in Java.
A simple way to create a bounded pool of worker actors and then distribute the tasks to them via messaging would be something like:
import scala.actors.Actor._
val numWorkers = 4
val pool = (1 to numWorkers).map { i =>
actor {
loop {
react {
case x: String => println(x)
}
}
}
}
for(i <- 1 to 20) {
val r = (new util.Random).nextInt(numWorkers)
pool(r) ! "task "+i
}
The reason we want to create multiple actors is because a single actor processes only one message (i.e. task) at a time so to get parallelism for your tasks you need to create multiple.
A side note: the default scheduler becomes particularly important when it comes to I/O bound tasks, as you will definitely want to change the size of the thread pool in that case. Two good blog posts which go into details about this are: Explore the Scheduling of Scala Actors and Scala actors thread pool pitfall.
With that said, Akka is an Actor framework that provides tools for more advanced workflows with Actors, and it is what I would use in any real application. Here is a load balancing (rather than random) task executor:
import akka.actor.Actor
import Actor._
import akka.routing.{LoadBalancer, CyclicIterator}
class TaskHandler extends Actor {
def receive = {
case t: Task =>
// some computationally expensive thing
t.execute
case _ => println("default case is required in Akka...")
}
}
class TaskRouter(numWorkers: Int) extends Actor with LoadBalancer {
val workerPool = Vector.fill(numWorkers)(actorOf[TaskHandler].start())
val seq = new CyclicIterator(workerPool)
}
val router = actorOf(new TaskRouter(4)).start()
for(i <- 1 to 20) {
router ! Task(..)
}
You can have different types of Load Balancing (CyclicIterator is round-robin distribution), so you can check the docs here for more info.
Well, you usually don't. Part of the attraction of using actors is that they handle such details for you.
If, however, you insist on managing that, you'll need to override the protected scheduler
method on your Actor
class to return an appropriate IScheduler
. See also the scala.actors.scheduler
package, and the comments on the Actor
trait concerning schedulers.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With