Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement a simple TCP protocol using Akka Streams?

I took a stab at implementing a simple TCP-based protocol for exchanging messages with Akka Streams (see below). However, it seems like the incoming messages are not processed immediately; that is, in the scenario where two messages are sent one after another from the client, the first message is only printed after something is sent from the server:

At t=1, on [client] A is entered
At t=2, on [client] B is entered
At t=3, on [server] Z is entered
At t=4, on [server] A is printed
At t=5, on [server] Y is entered
At t=6, on [server] B is printed

What I expected/want to see:

At t=1, on [client] A is entered
At t=2, on [server] A is printed
At t=3, on [client] B is entered
At t=4, on [server] B is printed
At t=5, on [server] Z is entered
At t=6, on [client] Z is printed
At t=7, on [server] Y is entered
At t=8, on [client] Y is printed

What am I missing? Perhaps I need to, somehow, make the sinks at both ends eager? Or is each sink somehow blocked by the corresponding source (while the source is waiting for input from the command line)?

import java.nio.charset.StandardCharsets.UTF_8

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{BidiFlow, Flow, Sink, Source, Tcp}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

import scala.io.StdIn

object AkkaStreamTcpChatter extends App {
  implicit val system = ActorSystem("akka-stream-tcp-chatter", ConfigFactory.defaultReference())
  implicit val materializer = ActorMaterializer()

  type Message = String
  val (host, port) = ("localhost", 46235)

  val deserialize:ByteString => Message = _.utf8String
  val serialize:Message => ByteString = message => ByteString(message getBytes UTF_8)

  val incoming:Flow[ByteString, Message, _] = Flow fromFunction deserialize
  val outgoing:Flow[Message, ByteString, _] = Flow fromFunction serialize

  val protocol = BidiFlow.fromFlows(incoming, outgoing)

  def prompt(s:String):Source[Message, _] = Source fromIterator {
    () => Iterator.continually(StdIn readLine s"[$s]> ")
  }

  val print:Sink[Message, _] = Sink foreach println

  args.headOption foreach {
    case "server" => server()
    case "client" => client()
  }

  def server():Unit =
    Tcp()
      .bind(host, port)
      .runForeach { _
        .flow
        .join(protocol)
        .runWith(prompt("S"), print)
      }

  def client():Unit =
    Tcp()
      .outgoingConnection(host, port)
      .join(protocol)
      .runWith(prompt("C"), print)
}
like image 910
Andrey Avatar asked Mar 26 '16 23:03

Andrey


1 Answers

I think the problem is that Akka Stream does operator fusion. This means that the complete flow handling runs on a single actor. When it blocks for reading your messages it cannot print out anything.

The solution would be to add an async boundary, after your source. See an example below.

def server(): Unit =
  Tcp()
    .bind(host, port)
    .runForeach {
      _
        .flow
        .join(protocol)
        .runWith(prompt("S").async, print) // note .async here
    }

def client(): Unit =
  Tcp()
    .outgoingConnection(host, port)
    .join(protocol).async
    .runWith(prompt("C").async, print) // note .async here

When you add an async boundary, then the fusion does not happen across the boundary, and the prompt runs on another actor, thus is not blocking print from showing anything.

like image 185
lpiepiora Avatar answered Sep 28 '22 02:09

lpiepiora