I am trying to write a Kafka consumer to websocket flow using reactive-kafka, akka-http and akka-stream.
val publisherActor = actorSystem.actorOf(CommandPublisher.props)
val publisher = ActorPublisher[String](publisherActor)
val commandSource = Source.fromPublisher(publisher) map toMessage
def toMessage(c: String): Message = TextMessage.Strict(c)
class CommandPublisher extends ActorPublisher[String] {
override def receive = {
case cmd: String =>
if (isActive && totalDemand > 0)
onNext(cmd)
}
}
object CommandPublisher {
def props: Props = Props(new CommandPublisher())
}
// This is the route
def mainFlow(): Route = {
path("ws" / "commands" ) {
handleWebSocketMessages(Flow.fromSinkAndSource(Sink.ignore, commandSource))
}
}
From the kafka consumer (omitted here), I do a publisherActor ! commandString
to dynamically add content to the websocket.
However, I run into this exception in the backend when I start multiple clients to the websocket:
[ERROR] [03/31/2016 21:17:10.335] [KafkaWs-akka.actor.default-dispatcher-3][akka.actor.ActorSystemImpl(KafkaWs)] WebSocket handler failed with can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
java.lang.IllegalStateException: can not subscribe the same subscriber multiple times (see reactive-streams specification, rules 1.10 and 2.12)
at akka.stream.impl.ReactiveStreamsCompliance$.canNotSubscribeTheSameSubscriberMultipleTimesException(ReactiveStreamsCompliance.scala:35)
at akka.stream.actor.ActorPublisher$class.aroundReceive(ActorPublisher.scala:295)
...
Can't one flow be used for all websocket clients? Or should the flow/publisher actor be created per client?
Here, I intend to send out "current"/ "live" notifications to all websocket clients. History of notifications is irrelevant and needs to be ignored for new clients.
I'm sorry to bear bad news, but it looks like this is the explicit design of akka
with respect to . You cannot reuse the instance of the flow for all clients as you want to. The fanout must be "explicit" as a consequence of the Rx model.
Examples I have come across use a routee-specific Flow
:
// The flow from beginning to end to be passed into handleWebsocketMessages
def websocketDispatchFlow(sender: String): Flow[Message, Message, Unit] =
Flow[Message]
// First we convert the TextMessage to a ReceivedMessage
.collect { case TextMessage.Strict(msg) => ReceivedMessage(sender, msg) }
// Then we send the message to the dispatch actor which fans it out
.via(dispatchActorFlow(sender))
// The message is converted back to a TextMessage for serialization across the socket
.map { case ReceivedMessage(from, msg) => TextMessage.Strict(s"$from: $msg") }
def route =
(get & path("chat") & parameter('name)) { name =>
handleWebsocketMessages(websocketDispatchFlow(sender = name))
}
Here is a discussion on it:
And this is exactly what I don't like in Akka Stream, this explicit fan-out. When I receive a data source from somewhere that I want to process (e.g. Observable or a Source), I just want to subscribe to it and I don't want to care on whether it's cold or hot or whether it's been subscribed by other subscribers or not. This is my river analogy. The river should not care about who drinks from it and the drinkers should not care about the river's source or about how many other drinkers there are. My sample, that is equivalent to the one Mathias provided, does share the data-source, but it simply does reference counting and you can have 2 subscribers or you can have 100, doesn't matter. And here I've gotten fancy, but reference counting doesn't work if you don't want to lose events or if you want to ensure that the stream remains always-on. But then you use
ConnectableObservable
which hasconnect(): Cancelable
and that's a perfect fit for say ... a Play's LifeCycle Plugin. And underlying that you can use a BehaviorSubject or a ReplaySubject if you want to repeat previous values for new subscribers. And things just work after that, no manual drawing of that connections graph needed. ... ... (this is from https://bionicspirit.com/blog/2015/09/06/monifu-vs-akka-streams.html) ... For functions that take an Observable and return an Observable, we indeed have lift, which is the closest thing to something that has a name and can be used to great effect in Monifu forSubject
or other Observable types because of the LiftOperators1 (and 2), which is what makes it possible to transform Observables without losing their type - this is an OOP-ish improvement over what RxJava does withlift
.But, such functions are not equivalent to
Processor
/Subject
. The difference is thatSubject
is at the same time a consumer and a producer. This means that subscribers do not get to control exactly when the data-source starts and that the data-source is in essence hot (meaning that multiple subscribers share the same data-source). In Rx it's totally fine if you model cold observables (meaning observables that start a new data-source per each individual subscriber). On the other hand in Rx (in general) it's not OK to have data sources that can be subscribed only once and then that's it. The only exception to this rule in Monifu are the Observables produced by the GroupBy operator, but that's like the exception that confirms the rule.What this means, especially coupled with another restriction of the contract of both Monifu and the Reactive Streams protocol (thou shall not subscribe multiple times with the same consumer), is that a
Subject
or aProcessor
instance is not reusable. In order for such an instance to be reusable, the Rx model would need a factory ofProcessor
. Furthermore this means that whenever you want to use aSubject
/Processor
, your data source must automatically be hot (shareable between multiple subscribers).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With