In my scenario I have 2 actors:
watchee
(I use TestProbe
)watcher
(Watcher
wrapped into TestActorRef
to expose some internal state
I track in my test)Watcher should take some actions when watchee
dies.
Here is the complete test case I've written so far:
class TempTest(_system: ActorSystem) extends TestKit(_system) with ImplicitSender with FunSuiteLike with Matchers with BeforeAndAfterAll {
def this() = this(ActorSystem("TempTest"))
override def afterAll {
TestKit.shutdownActorSystem(system)
}
class WatcherActor(watchee: ActorRef) extends Actor {
var state = "initial"
context.watch(watchee)
override def receive: Receive = {
case "start" =>
state = "start"
case _: Terminated =>
state = "terminated"
}
}
test("example") {
val watchee = TestProbe()
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
assert(watcher.underlyingActor.state === "initial")
watcher ! "start" // "start" will be sent and handled by watcher synchronously
assert(watcher.underlyingActor.state === "start")
system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
Thread.sleep(100) // what is the best way to avoid blocking here?
assert(watcher.underlyingActor.state === "terminated")
}
}
Now, since all involved actors use CallingThreadDispatcher
(all Akka's test helpers gets constructed using props with .withDispatcher(CallingThreadDispatcher.Id)
) I can safely assume that when this statement returns:
watcher ! "start"
... the "start" message is already processed by WatchingActor
and thus I can make assertions based in the watcher.underlyingActor.state
However, based on my observations, when I stop watchee
using system.stop
or by sending Kill
to it the Terminated
message produced as a side effect of watchee
death gets executed asynchronously, in another thread.
Not-a-solution is to stop watchee
, block thread for some time and verify Watcher
state after that, but I'd like to know how to I do this the right way (i.e. how to be sure that after killing actor it's watcher received and processed Terminated
message signaling it's death)?
One way to fix this issue is to introduce another watcher in your test that also watches the watchee
. This other watcher is a TestProbe
which will allow us to perform an assertion on it that will get rid of the timing issues you are seeing. First, the modified test code:
val watchee = TestProbe()
val watcher = TestActorRef[WatcherActor](Props(new WatcherActor(watchee.ref)))
val probeWatcher = TestProbe()
probeWatcher watch watchee.ref
assert(watcher.underlyingActor.state === "initial")
watcher ! "start" // "start" will be sent and handled by watcher synchronously
assert(watcher.underlyingActor.state === "start")
system.stop(watchee.ref) // will cause Terminated to be sent and handled asynchronously by watcher
probeWatcher.expectTerminated(watchee.ref)
assert(watcher.underlyingActor.state === "terminated")
So you can see that I have introduced the additional watcher with the lines:
val probeWatcher = TestProbe()
probeWatcher watch watchee.ref
Then, later in the code, before the final assertion that is failing for you I use another assertion that lets me know that the Terminated
message for the stopped actor has been properly distributed:
probeWatcher.expectTerminated(watchee.ref)
When the code moves past this line I can be sure that the watcher
under test has also received its terminated message and the assertion to follow will pass.
EDIT
As noted by the OP, there is a level of non-determinism with this code. Another possible solution is to change the line in the test code that stops the actor to:
watcher.underlyingActor.context.stop(watchee.ref)
By using the context
of the TestActorRef
I believe the Terminated
will be delivered all via the CallingThreadDispatcher
and thus be completely synchronous. I tested this in a loop and it worked for me over 1000 iterations.
Now I thought that maybe because I was performing the stop
using the same actor that was expecting the Terminated
that maybe there was an optimization to deliver the Terminated
to self for that scanario, so I also tested this with a completely different Actor
as follows:
class FooActor extends Actor{
def receive = {
case _ =>
}
Then in the test code:
val foo = TestActorRef(new FooActor)
And on the stopping:
foo.underlyingActor.context.stop(watchee.ref)
This also worked as expected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With