I try to implement a Producer Consumer program in scala without employing Queue. Because I think Actor has already implemented "mail queue" or something else, it would be redundant to write the code again.
I tried to write the program in Actor purely. Below is a multiple producer multiple consumer program. Producer sleep for a while for simulating doing something. Consumer do not sleep at all.
However I don't know how to shutdown the program if I do not add a supervisor actor to monitor consumers, as well as a Promise object for using "Await"(Supervisor class in the code)
Is there anyway to get rid of them?
import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
class Producer(val pool:ActorRef)(val name:String) extends Actor {
def receive = {
case _ =>
while (true) {
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
}
}
}
class Consumer(supervisor : ActorRef)(val name:String) extends Actor {
var counter = 0
def receive = {
case s =>
counter += 1
println("%s eat food produced by %s" format (name,s))
if (counter >= 10) {
println("%s is full" format name)
context.stop(self)
supervisor ! 1
}
}
}
class Supervisor(p:Promise[String]) extends Actor {
var r = 3
def receive = {
case _ =>
r -= 1
if (0 == r) {
println("All consumer stopped")
context.stop(self)
p success ("Good")
}
}
}
object Try3 {
def work(): Unit = {
val system = ActorSystem("sys1")
val nProducer = 5;
val nConsumer = 3;
val p = Promise[String]
val supervisor = system.actorOf(Props(new Supervisor(p)));
val arrConsumer = for ( i <- 1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) )
val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) ))
val arrProducer = for ( i <- 1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) )
arrProducer foreach (_ ! "start")
Await.result(p.future,Duration.Inf)
println("great!")
system.shutdown
}
def main(args:Array[String]): Unit = {
work()
}
}
The receive function Producer class has a problem that it won't be shut down because it is while without breaking condition.
The only way I can think of is "sending a message to producer itself". I wonder is it the normal way to implement this kind of request?
Here is the modified code:
class Producer(val pool:ActorRef)(val name:String) extends Actor {
// original implementation:
// def receive = {
// case _ =>
// while (true){
// val sleepTime = scala.util.Random.nextInt(1000)
// Thread.sleep(sleepTime)
// println("Producer %s send food" format name)
// pool ! name
// }
// }
case object Loop;
def receive = {
case _ =>
val sleepTime = scala.util.Random.nextInt(1000)
Thread.sleep(sleepTime)
println("Producer %s send food" format name)
pool ! name
self ! Loop //send message to itself
}
}
Regardless of my implementation, what is the correct way to implement Producer Consumer program in scala, with Actor or Future/Promise?
You should never block (in your case Thread.sleep, while loop) inside an actor. Blocking inside an actor hogs a thread from a thread pool used among all the actors. Even a small amount of Producers like yours would make all the actor in the ActorSystem deprived from threads and render them unusable.
Instead use a Scheduler
to schedule a meesage send periodically in your Producer.
override def preStart(): Unit = {
import scala.concurrent.duration._
import context.dispatcher
context.system.scheduler.schedule(
initialDelay = 0.seconds,
interval = 1.second,
receiver = pool,
message = name
)
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With