Are there any good tutorials/explanations on how to use the the event bus in akka? I've read through the Akka doc but I find it difficult to understand how to use the event bus
Not sure if there are or aren't any good tutorials out there, but I can give you a quick example of a possible user case where using the event stream might be helpful. At a high level though, the event stream is a good mechanism for meeting pub/sub type requirements that your app might have. Let's say that you have a use case where you update a user's balance in your system. The balance is accessed often, so you have decided to cache it for better performance. When a balance is updated, you also want to check and see if the user crosses a threshold with their balance and if so, email them. You don't want either the cache drop or the balance threshold check to be tied directly into the main balance update call as they might be heavy weight and slow down the user's response. You could model that particular set of requirements like so:
//Message and event classes
case class UpdateAccountBalance(userId:Long, amount:Long)
case class BalanceUpdated(userId:Long)
//Actor that performs account updates
class AccountManager extends Actor{
val dao = new AccountManagerDao
def receive = {
case UpdateAccountBalance(userId, amount) =>
val res = for(result <- dao.updateBalance(userId, amount)) yield{
context.system.eventStream.publish(BalanceUpdated(userId))
result
}
sender ! res
}
}
//Actor that manages a cache of account balance data
class AccountCacher extends Actor{
val cache = new AccountCache
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
cache.remove(userId)
}
}
//Actor that checks balance after an update to warn of low balance
class LowBalanceChecker extends Actor{
val dao = new LowBalanceDao
override def preStart = {
context.system.eventStream.subscribe(context.self, classOf[BalanceUpdated])
}
def receive = {
case BalanceUpdated(userId) =>
for{
balance <- dao.getBalance(userId)
theshold <- dao.getBalanceThreshold(userId)
if (balance < threshold)
}{
sendBalanceEmail(userId, balance)
}
}
}
In this example, the AccountCacher
and LowBalanceChecker
actors both subscribe into the eventStream
by class type for the BalanceUpdated
event. If this event is event published to the stream, it will be received by both of these actor instances. Then, in the AccountManager
, when the balance update succeeds, it raises a BalanceUpdated
event for the user. When this happens, in parallel, that message is delivered to the mailboxes for both the AccountCacher
and the LowBalanceChecker
resulting in the balance into being dropped from the cache and the account threshold checked and possibly an email being sent.
Now, you could have just put direct tell (!)
calls into the AccountManager
to communicate directly with these other two actors, but one could argue that might be too closely coupling these two "side effects" of a balance update, and that those types of details don't necessarily belong in the AccountManager
. If you have a condition that might result in some additional things (checks, updates, etc...) that need to happen purely as side effects (not part of the core business flow itself), then the event stream might be a good way to decouple the event being raised and who might need to react to that event.
There is an EventBus
that exists for every ActorSystem
. This EventBus
is referred to as the Event Stream and can be obtained by calling system.eventStream
.
The ActorSystem uses the Event Stream for a number of things including logging, sending Dead Letters and Cluster Events.
You can also use the Event Stream for your own publish/subscribe requirements. For example, the event stream can be useful during testing. Subscribe the Test Kit's testActor
to the Event Stream for certain events (eg. logging events) and you can expect
them . This can be especially useful when you would not send a message to another actor when something happens but you still need to expect the event in your test.
Note that the event stream only works within one ActorSystem
. If you are using remoting events published on the stream do not cross to remote systems by default (though you could add that support yourself).
You could theoretically create a separate EventBus
if you didn't want to use the Event Stream.
Better docs for the Event Bus are being worked on for Akka 2.2 so check back again when this ticket is complete.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With