I have a WebSocket that clients can connect to I also have a stream of data using akka-streams. How can I make it that all clients get the same data. At the moment they seem to be racing for the data.
Thanks
One way you could do is is to have an actor that extends ActorPublisher and have it subscribe to some message.
class MyPublisher extends ActorPublisher[MyData]{
override def preStart = {
context.system.eventStream.subscribe(self, classOf[MyData])
}
override def receive: Receive = {
case msg: MyData ⇒
if (isActive && totalDemand > 0) {
// Pushes the message onto the stream
onNext(msg)
}
}
}
object MyPublisher {
def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}
case class MyData(data:String)
You can then use that actor as the source for the stream:
val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))
You can then create a flow from that datasource and apply a transform to convert the data into a websocket message
val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})
Then you can use that flow in your route handling.
path("readings") {
handleWebsocketMessages(myFlow)
}
From the processing of the original stream, you can then publish the data to the event stream and any instance of that actor will pick it up and put in onto the stream that their websocket is being served from.
val actorSystem = ActorSystem("foo")
val otherSource = Source.fromIterator(() => List(MyData("a"), MyData("b")).iterator)
otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}
Each socket will then have its own instance of the actor to provide it with data all coming from a single source.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With