Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka-Http Websockets: How to Send consumers the same stream of data

I have a WebSocket that clients can connect to I also have a stream of data using akka-streams. How can I make it that all clients get the same data. At the moment they seem to be racing for the data.

Thanks

like image 501
Ciaran0 Avatar asked Jan 07 '23 16:01

Ciaran0


1 Answers

One way you could do is is to have an actor that extends ActorPublisher and have it subscribe to some message.

class MyPublisher extends ActorPublisher[MyData]{

  override def preStart = {
    context.system.eventStream.subscribe(self, classOf[MyData])
  }

  override def receive: Receive = {

    case msg: MyData ⇒
      if (isActive && totalDemand > 0) {
        // Pushes the message onto the stream
        onNext(msg)
      }
  }
}

object MyPublisher {
  def props(implicit ctx: ExecutionContext): Props = Props(new MyPublisher())
}

case class MyData(data:String)

You can then use that actor as the source for the stream:

val dataSource = Source.actorPublisher[MyData](MyPublisher.props(someExcutionContext))

You can then create a flow from that datasource and apply a transform to convert the data into a websocket message

val myFlow = Flow.fromSinkAndSource(Sink.ignore, dataSource map {d => TextMessage.Strict(d.data)})

Then you can use that flow in your route handling.

path("readings") {
  handleWebsocketMessages(myFlow)
} 

From the processing of the original stream, you can then publish the data to the event stream and any instance of that actor will pick it up and put in onto the stream that their websocket is being served from.

  val actorSystem = ActorSystem("foo")

  val otherSource = Source.fromIterator(()  => List(MyData("a"), MyData("b")).iterator)

  otherSource.runForeach { msg ⇒ actorSystem.eventStream.publish(MyData("data"))}

Each socket will then have its own instance of the actor to provide it with data all coming from a single source.

like image 95
Al Iacovella Avatar answered Jan 19 '23 07:01

Al Iacovella