As in my own answer to my own question, I have the situation whereby I am processing a large number of events which arrive on a queue. Each event is handled in exactly the same manner and each even can be handled independently of all other events.
My program takes advantage of the Scala concurrency framework and many of the processes involved are modelled as Actor
s. As Actor
s process their messages sequentially, they are not well-suited to this particular problem (even though my other actors are performing actions which are sequential). As I want Scala to "control" all thread creation (which I assume is the point of it having a concurrency system in the first place) it seems I have 2 choices:
Actor
to process them concurrently by some other mechanismI would have thought that #1 negates the point of using the actors subsystem: how many processor actors should I create? being one obvious question. These things are supposedly hidden from me and solved by the subsystem.
My answer was to do the following:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//I want to be able to handle multiple events at the same time
//create a new actor to handle it
actor {
//processing code here
process(x)
}
}
}
}
Is there a better approach? Is this incorrect?
edit: A possibly better approach is:
val eventProcessor = actor {
loop {
react {
case MyEvent(x) =>
//Pass processing to the underlying ForkJoin framework
Scheduler.execute(process(e))
}
}
}
Concurrency is when more than one task can start and complete in overlapping time periods. It doesn't matter whether they're running at the same instant. You can write concurrent programs on a single CPU (single execution core) machine where only one task can execute at a given point of time.
Although Scala is still a language on the rise that has yet to receive the wide-scale adoption of a language such as Java, its support for concurrent programming is rich and powerful.
Scala got a fame for being good for concurrency because it is a functional language and because of its actors library. Functional languages are good for concurrency because they focus on immutability, which helps concurrent algorithms.
Scala supports multithreading, which means we can execute multiple threads at once. We can perform multiple operations independently and achieve multitasking. This lets us develop concurrent applications.
This seems like a duplicate of another question. So I'll duplicate my answer
Actors process one message at a time. The classic pattern to process multiple messages is to have one coordinator actor front for a pool of consumer actors. If you use react then the consumer pool can be large but will still only use a small number of JVM threads. Here's an example where I create a pool of 10 consumers and one coordinator to front for them.
import scala.actors.Actor
import scala.actors.Actor._
case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop
def consumer(n : Int) = actor {
loop {
react {
case Ready(sender) =>
sender ! Ready(self)
case Request(sender, payload) =>
println("request to consumer " + n + " with " + payload)
// some silly computation so the process takes awhile
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
sender ! Result(result)
println("consumer " + n + " is done processing " + result )
case Stop => exit
}
}
}
// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)
val coordinator = actor {
loop {
react {
case msg @ Request(sender, payload) =>
consumers foreach {_ ! Ready(self)}
react {
// send the request to the first available consumer
case Ready(consumer) => consumer ! msg
}
case Stop =>
consumers foreach {_ ! Stop}
exit
}
}
}
// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)
This code tests to see which consumer is available and sends a request to that consumer. Alternatives are to just randomly assign to consumers or to use a round robin scheduler.
Depending on what you are doing, you might be better served with Scala's Futures. For instance, if you don't really need actors then all of the above machinery could be written as
import scala.actors.Futures._
def transform(payload : String) = {
val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
println("transformed " + payload + " to " + result )
result
}
val results = for (i <- 0 to 1000) yield future(transform(i.toString))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With