Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing concurrently in Scala

As in my own answer to my own question, I have the situation whereby I am processing a large number of events which arrive on a queue. Each event is handled in exactly the same manner and each even can be handled independently of all other events.

My program takes advantage of the Scala concurrency framework and many of the processes involved are modelled as Actors. As Actors process their messages sequentially, they are not well-suited to this particular problem (even though my other actors are performing actions which are sequential). As I want Scala to "control" all thread creation (which I assume is the point of it having a concurrency system in the first place) it seems I have 2 choices:

  1. Send the events to a pool of event processors, which I control
  2. get my Actor to process them concurrently by some other mechanism

I would have thought that #1 negates the point of using the actors subsystem: how many processor actors should I create? being one obvious question. These things are supposedly hidden from me and solved by the subsystem.

My answer was to do the following:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //I want to be able to handle multiple events at the same time
        //create a new actor to handle it
        actor {
          //processing code here
          process(x)
        }
    }
  }
}

Is there a better approach? Is this incorrect?

edit: A possibly better approach is:

val eventProcessor = actor {
  loop {
    react {
      case MyEvent(x) =>
        //Pass processing to the underlying ForkJoin framework
        Scheduler.execute(process(e))
    }
  }
}
like image 822
oxbow_lakes Avatar asked Jun 17 '09 14:06

oxbow_lakes


People also ask

What is Concurrent Programming in Scala?

Concurrency is when more than one task can start and complete in overlapping time periods. It doesn't matter whether they're running at the same instant. You can write concurrent programs on a single CPU (single execution core) machine where only one task can execute at a given point of time.

Does Scala support concurrent programming?

Although Scala is still a language on the rise that has yet to receive the wide-scale adoption of a language such as Java, its support for concurrent programming is rich and powerful.

Is Scala good for concurrency?

Scala got a fame for being good for concurrency because it is a functional language and because of its actors library. Functional languages are good for concurrency because they focus on immutability, which helps concurrent algorithms.

Is Scala multithreaded?

Scala supports multithreading, which means we can execute multiple threads at once. We can perform multiple operations independently and achieve multitasking. This lets us develop concurrent applications.


1 Answers

This seems like a duplicate of another question. So I'll duplicate my answer

Actors process one message at a time. The classic pattern to process multiple messages is to have one coordinator actor front for a pool of consumer actors. If you use react then the consumer pool can be large but will still only use a small number of JVM threads. Here's an example where I create a pool of 10 consumers and one coordinator to front for them.

import scala.actors.Actor
import scala.actors.Actor._

case class Request(sender : Actor, payload : String)
case class Ready(sender : Actor)
case class Result(result : String)
case object Stop

def consumer(n : Int) = actor {
  loop {
    react {
      case Ready(sender) => 
        sender ! Ready(self)
      case Request(sender, payload) =>
        println("request to consumer " + n + " with " + payload)
        // some silly computation so the process takes awhile
        val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
        sender ! Result(result)
        println("consumer " + n + " is done processing " + result )
      case Stop => exit
    }
  }
}

// a pool of 10 consumers
val consumers = for (n <- 0 to 10) yield consumer(n)

val coordinator = actor {
  loop {
     react {
        case msg @ Request(sender, payload) =>
           consumers foreach {_ ! Ready(self)}
           react {
              // send the request to the first available consumer
              case Ready(consumer) => consumer ! msg
           }
         case Stop => 
           consumers foreach {_ ! Stop} 
           exit
     }
  }
}

// a little test loop - note that it's not doing anything with the results or telling the coordinator to stop
for (i <- 0 to 1000) coordinator ! Request(self, i.toString)

This code tests to see which consumer is available and sends a request to that consumer. Alternatives are to just randomly assign to consumers or to use a round robin scheduler.

Depending on what you are doing, you might be better served with Scala's Futures. For instance, if you don't really need actors then all of the above machinery could be written as

import scala.actors.Futures._

def transform(payload : String) = {      
  val result = ((payload + payload + payload) map {case '0' => 'X'; case '1' => "-"; case c => c}).mkString
  println("transformed " + payload + " to " + result )
  result
}

val results = for (i <- 0 to 1000) yield future(transform(i.toString))
like image 80
James Iry Avatar answered Oct 10 '22 01:10

James Iry