Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala: Akka actor not dying in Play Framework 2.2.0

I have an example I'm building using Play Framework 2.2.0-scala that uses WebSockets to stream data to a client. The problem I'm having is that for whatever reason, one of the children of a parent Actor is not properly being shutdown. All logs indicate that it is stopping and it has shutdown, but I see it isn't actually down by publishing data to it. Here's some code, first with my controller action:

def scores(teamIds: String) = WebSocket.async[JsValue] { request =>
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el =>
        s"nfl-streaming-scores-${el}"
    }

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr)))
    ScoresStream.join(scoresStream)
  }

So every time a client connects, they join ScoresStream which returns the respective Iteratee,Enumerator that WebSocket.async requires. The actual ScoresStream object looks like the following:

object ScoresStream {

  implicit val timeout = Timeout(5 seconds)

  def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = {

    (scoresStream ? BeginStreaming).map {

      case Connected(enumerator) => 
        val iteratee = Iteratee.foreach[JsValue] { _ => 
          Logger.info("Ignore iteratee input.")
        }.map { _ => 
          Logger.info("Client quitting - killing Actor.")
          scoresStream ! UnsubscribeAll
          scoresStream ! PoisonPill
        }
        (iteratee,enumerator)
}

The idea here is to kill the main Actor, ScoresStream, when the client disconnects. I do that by using scoresStream ! PoisonPill.

ScoresStream in turn creates Pub and Sub instances which are wrappers that connect to Redis for publishing/scribing to messages, here's the Actor code:

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging {

  val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue]

  case class Message(kind: String, user: String, message: String)
  implicit val messageReads = Json.reads[Message]
  implicit val messageWrites = Json.writes[Message]

  val sub = context.child("sub") match {
    case None    => createSub(scoresChannel)
    case Some(c) => c
  }

  val pub = context.child("pub") match {
     case None     => createPub(teamIds)
     case Some(c)  => c
  }

  def receive = {
    case BeginStreaming => {
      log.info("hitting join...")
      sub ! RegisterCallback
      sub ! SubscribeChannel(teamIds)
      sender ! Connected(scoresEnumerator)
    }

    case UnsubscribeAll => {
      sub ! UnsubscribeChannel(teamIds)
    }
  }

}

trait CreatePubSub { self:Actor =>
  def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub")
  def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub")
}

Finally, here's the actual Sub Actor code: (Pub doesn't seem relevant here as it is shutting down fine):

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging {
  val s = context.child("subscriber") match {
    case None    => createSubscriber
    case Some(c) => c
  }

  def callback(pubsub: PubSubMessage) = pubsub match {
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup")
    case S(channel, no) => println("subscribed to " + channel + " and count = " + no)
    case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no)
    case M(channel, msg) => 
      msg match {
        // exit will unsubscribe from all channels and stop subscription service
        case "exit" => 
          println("unsubscribe all ..")
          pChannel.end
          r.unsubscribe

        // message "+x" will subscribe to channel x
        case x if x startsWith "+" => 
          val s: Seq[Char] = x
          s match {
            case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => }
          }

        // message "-x" will unsubscribe from channel x
        case x if x startsWith "-" => 
          val s: Seq[Char] = x
          s match {
            case Seq('-', rest @ _*) => r.unsubscribe(rest.toString)
                                        pChannel.end
          }

        case x => 
         try {
            log.info("Just got a message: " + x)
            pChannel.push(Json.parse(x))
          } 
          catch {
            case ex: com.fasterxml.jackson.core.JsonParseException => {
              log.info("Malformed JSON sent.")
            }
          }
      }
  }

  def receive = {
    case RegisterCallback => {
      log.info("Creating a subscriber and registering callback")  
      s ! Register(callback)
    }
    case SubscribeChannel(teamIds) => {
      teamIds.foreach { x => log.info("subscribing to channel " + x + " ") }
      //sub ! Subscribe(Array("scores-5","scores-6"))
      s ! Subscribe(teamIds)
    }
    case UnsubscribeChannel(teamIds) => {
      teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") }
      s ! Unsubscribe(teamIds)
    }
    case true => println("Subscriber successfully received message.")
    case false => println("Something went wrong.")
  }
}

trait CreatePublisherSubscriber { self:Actor =>
  def r = new RedisClient("localhost", 6379)
  def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber")
  def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher")
}

Now when a client connects, the startup messages look healthy:

[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921]
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started (com.example.stream.models.ScoresStream@131a9310)
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991]
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started (com.redis.Publisher@3b34c0a6)
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started (com.redis.Subscriber@453f0a8)
Subscriber successfully received message.
Subscriber successfully received message.
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started (com.example.stream.models.Sub@6165ab39)
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862]
subscribed to nfl-streaming-scores-5 and count = 1
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539]
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join...
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started (com.example.stream.models.Pub@48007a17)
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514]

And disconnecting looks healthy:

[info] application - Client quitting - killing Actor.
unsubscribed from nfl-streaming-scores-5 and count = 0
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters])
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped

And here's the problem, after the client has disconnected, I'm going to send a message that the currently shutdown Actor was subscribed to:

redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}"

and here it is showing up, when it shouldn't be, this Actor should technically be dead. Other Actors that were around before receive the message also, ones labeled with $a/$b. I can confirm no other clients are connected.

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"}

What's also an odd indicator is that address names never get re-used. I keep seeing a trend like the following of names spawn when I disconnect/connect:

akka://application/user/$c
akka://application/user/$d
akka://application/user/$e

Never see old references get re-used.

My assumption here is that the connection to Redis is not being cleanly closed. It doesn't explain why logs say the Actor has stopped yet still exist, but I definitely see connections established to redis after running netstat even after all Actors are presumably dead. When I stop the application completely from running, those connections clear up. It's as if unsubscribe is silently failing and that's keeping the Actor alive and also the connection, which is really bad for multiple reasons because eventually the system will run out of file descriptors and/or have memory leaks. Is there something obvious here that I'm doing wrong?

like image 295
randombits Avatar asked Oct 21 '22 22:10

randombits


1 Answers

Just because you are stopping the actor does not mean that any resources owned by that actor are automatically cleaned up. If there is a RedisClient tied to that actor instance, and this connection needs to be stopped in order to be properly cleaned up, then you should do something like that in the postStop method. I also agree with @Schleichardt in that you should change your def r = new RedisClient into a val or a lazy val (depending on initialization order and needs). That way you know that per subscriber instance, there is only a single RedisClient to clean up. I don't know the API for the RedisClient you are using, but let's say it has a shutdown method that will terminate its connection and clean up its resources. Then you can simple add a postStop to the subscriber actor like so:

override def postStop {
  r.shutdown
}

Assuming you make the def to val change, this might be what you are looking for.

like image 83
cmbaxter Avatar answered Nov 15 '22 06:11

cmbaxter