I'm trying to implement read side in my ES-CQRS architecture. Let's say I have a persistent actor like this:
object UserWrite {
sealed trait UserEvent
sealed trait State
case object Uninitialized extends State
case class User(username: String, password: String) extends State
case class AddUser(user: User)
case class UserAdded(user: User) extends UserEvent
case class UserEvents(userEvents: Source[(Long, UserEvent), NotUsed])
case class UsersStream(fromSeqNo: Long)
case object GetCurrentUser
def props = Props(new UserWrite)
}
class UserWrite extends PersistentActor {
import UserWrite._
private var currentUser: State = Uninitialized
override def persistenceId: String = "user-write"
override def receiveRecover: Receive = {
case UserAdded(user) => currentUser = user
}
override def receiveCommand: Receive = {
case AddUser(user: User) => persist(UserAdded(user)) {
case UserAdded(`user`) => currentUser = user
}
case UsersStream(fromSeqNo: Long) => publishUserEvents(fromSeqNo)
case GetCurrentUser => sender() ! currentUser
}
def publishUserEvents(fromSeqNo: Long) = {
val readJournal = PersistenceQuery(context.system).readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
val userEvents = readJournal
.eventsByPersistenceId("user-write", fromSeqNo, Long.MaxValue)
.map { case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event }
sender() ! UserEvents(userEvents)
}
}
As far as I understand, each time when event gets persisted, we can publish it via Akka Persistence Query
. Now, I'm not sure what would be a proper way to subscribe on these events so I can persist it in my read side database? One of the ideas is to initially send a UsersStream
message from my read side actor to UserWrite
actor and "sink" events in that read actor.
EDIT
Following suggestion of @cmbaxter, I implemented read side this way:
object UserRead {
case object GetUsers
case class GetUserByUsername(username: String)
case class LastProcessedEventOffset(seqNo: Long)
case object StreamCompleted
def props = Props(new UserRead)
}
class UserRead extends PersistentActor {
import UserRead._
var inMemoryUsers = Set.empty[User]
var offset = 0L
override val persistenceId: String = "user-read"
override def receiveRecover: Receive = {
// Recovery from snapshot will always give us last sequence number
case SnapshotOffer(_, LastProcessedEventOffset(seqNo)) => offset = seqNo
case RecoveryCompleted => recoveryCompleted()
}
// After recovery is being completed, events will be projected to UserRead actor
def recoveryCompleted(): Unit = {
implicit val materializer = ActorMaterializer()
PersistenceQuery(context.system)
.readJournalFor[CassandraReadJournal](CassandraReadJournal.Identifier)
.eventsByPersistenceId("user-write", offset + 1, Long.MaxValue)
.map {
case EventEnvelope(_, _, seqNo, event: UserEvent) => seqNo -> event
}
.runWith(Sink.actorRef(self, StreamCompleted))
}
override def receiveCommand: Receive = {
case GetUsers => sender() ! inMemoryUsers
case GetUserByUsername(username) => sender() ! inMemoryUsers.find(_.username == username)
// Match projected event and update offset
case (seqNo: Long, UserAdded(user)) =>
saveSnapshot(LastProcessedEventOffset(seqNo))
inMemoryUsers += user
}
}
There are some issues like: Event stream seems to be slow. I.e. UserRead
actor can answer with set of users before the newly added user is being saved.
EDIT 2
I increased refresh interval of cassandra query journal which more less solved issue with slow event stream. It appears that Cassandra event journal is by default, being polled each 3 seconds. In my application.conf
I added:
cassandra-query-journal {
refresh-interval = 20ms
}
EDIT 3
Actually, do not decrease refresh interval. That will increase memory usage but that's not dangerous, neither a point. In general concept of CQRS is that write and read side are async. Therefore, after you write data will never be available immediately for reading. Dealing with UI? I just open the stream and push data via server sent events after the read side acknowledges them.
There are some ways to do this. For example, in my app i have an actor in my query side that have a PersistenceQuery that is consistently looking for changes, but you can have a thread with the same query too. The thing is to maintain the stream open to be able to read the persisted event as soon as it happens
val readJournal =
PersistenceQuery(system).readJournalFor[CassandraReadJournal](
CassandraReadJournal.Identifier)
// issue query to journal
val source: Source[EventEnvelope, NotUsed] =
readJournal.eventsByPersistenceId(s"MyActorId", 0, Long.MaxValue)
// materialize stream, consuming events
implicit val mat = ActorMaterializer()
source.map(_.event).runForeach{
case userEvent: UserEvent => {
doSomething(userEvent)
}
}
Instead of this, you can have a timer that raises a PersistenceQuery and stores new events, but i think that having a stream open is the best way
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With