I took a stab at implementing a simple TCP-based protocol for exchanging messages with Akka Streams (see below). However, it seems like the incoming messages are not processed immediately; that is, in the scenario where two messages are sent one after another from the client, the first message is only printed after something is sent from the server:
At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed
What I expected/want to see:
At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed
What am I missing? Perhaps I need to, somehow, make the sinks at both ends eager? Or is each sink somehow blocked by the corresponding source (while the source is waiting for input from the command line)?
import java.nio.charset.StandardCharsets.UTF_8
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
object AkkaStreamTcpChatter extends App {
implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
implicit val materializer = ActorMaterializer()
type Message = String
val (host, port) = ("localhost", 46235)
val deserialize:ByteString => Message = _.utf8String
val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)
val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize
val protocol = BidiFlow.fromFlows(incoming, outgoing)
def prompt(s:String):Source[Message, _] = Source fromIterator {
() => Iterator.continually(StdIn readLine s"[$s]> ")
}
val print:Sink[Message, _] = Sink foreach println
args.headOption foreach {
case "server" => server()
case "client" => client()
}
def server():Unit =
Tcp()
.bind(host, port)
.runForeach { _
.flow
.join(protocol)
.runWith(prompt("S"), print)
}
def client():Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol)
.runWith(prompt("C"), print)
}
I think the problem is that Akka Stream does operator fusion. This means that the complete flow handling runs on a single actor. When it blocks for reading your messages it cannot print out anything.
The solution would be to add an async boundary, after your source. See an example below.
def server(): Unit =
Tcp()
.bind(host, port)
.runForeach {
_
.flow
.join(protocol)
.runWith(prompt("S").async, print) // note .async here
}
def client(): Unit =
Tcp()
.outgoingConnection(host, port)
.join(protocol).async
.runWith(prompt("C").async, print) // note .async here
When you add an async boundary, then the fusion does not happen across the boundary, and the prompt
runs on another actor, thus is not blocking print
from showing anything.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With