Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the correct way to implement Producer Consumer in scala

I try to implement a Producer Consumer program in scala without employing Queue. Because I think Actor has already implemented "mail queue" or something else, it would be redundant to write the code again.

I tried to write the program in Actor purely. Below is a multiple producer multiple consumer program. Producer sleep for a while for simulating doing something. Consumer do not sleep at all.

However I don't know how to shutdown the program if I do not add a supervisor actor to monitor consumers, as well as a Promise object for using "Await"(Supervisor class in the code)

Is there anyway to get rid of them?

import akka.actor.Actor.Receive
import akka.actor._
import akka.routing._;
import akka.util._

import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  def receive = {
    case _ =>
      while (true) {
        val sleepTime = scala.util.Random.nextInt(1000)
        Thread.sleep(sleepTime)
        println("Producer %s send food" format name)
        pool ! name
      }
  }
}

class Consumer(supervisor : ActorRef)(val name:String) extends Actor {

  var counter = 0

  def receive = {
    case s => 
      counter += 1
      println("%s eat food produced by %s" format (name,s))

      if (counter >= 10) {
        println("%s is full" format name)

        context.stop(self)
        supervisor ! 1
      }
  }
}

class Supervisor(p:Promise[String]) extends Actor {

  var r = 3

  def receive = {
    case _ =>
      r -= 1
      if (0 == r) {
        println("All consumer stopped")
        context.stop(self)
        p success ("Good")
      }
  }

}

object Try3 {

  def work(): Unit = {
    val system = ActorSystem("sys1")
    val nProducer = 5;
    val nConsumer = 3;
    val p = Promise[String]
    val supervisor = system.actorOf(Props(new Supervisor(p)));
    val arrConsumer = for ( i <-  1 to nConsumer) yield system.actorOf( Props( new Consumer(supervisor)( "Consumer %d" format (i) ) ) )
    val poolConsumer = system.actorOf(Props.empty.withRouter( RoundRobinRouter(arrConsumer) ))
    val arrProducer = for ( i <-  1 to nProducer) yield system.actorOf( Props( new Producer(poolConsumer)( "Producer %d" format (i) ) ) )

    arrProducer foreach (_ ! "start")

    Await.result(p.future,Duration.Inf)
    println("great!")
    system.shutdown
  }

  def main(args:Array[String]): Unit = {
    work()
  }
}

The receive function Producer class has a problem that it won't be shut down because it is while without breaking condition.

The only way I can think of is "sending a message to producer itself". I wonder is it the normal way to implement this kind of request?

Here is the modified code:

class Producer(val pool:ActorRef)(val name:String) extends Actor {

  //  original implementation:
  //  def receive = {
  //    case _ =>
  //    while (true){
  //      val sleepTime = scala.util.Random.nextInt(1000)
  //      Thread.sleep(sleepTime)
  //      println("Producer %s send food" format name)
  //      pool ! name
  //    }
  //  }

  case object Loop;

  def receive = {
    case _ =>
      val sleepTime = scala.util.Random.nextInt(1000)
      Thread.sleep(sleepTime)
      println("Producer %s send food" format name)
      pool ! name
      self ! Loop   //send message to itself
  }
}

Regardless of my implementation, what is the correct way to implement Producer Consumer program in scala, with Actor or Future/Promise?

like image 667
worldterminator Avatar asked Oct 10 '14 06:10

worldterminator


1 Answers

You should never block (in your case Thread.sleep, while loop) inside an actor. Blocking inside an actor hogs a thread from a thread pool used among all the actors. Even a small amount of Producers like yours would make all the actor in the ActorSystem deprived from threads and render them unusable.

Instead use a Scheduler to schedule a meesage send periodically in your Producer.

override def preStart(): Unit = {
  import scala.concurrent.duration._
  import context.dispatcher
  context.system.scheduler.schedule(
    initialDelay = 0.seconds,
    interval = 1.second,
    receiver = pool,
    message = name
  )
}
like image 60
dvim Avatar answered Nov 18 '22 02:11

dvim