Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

akka streams over tcp

Here is the setup: I want to be able to stream messages (jsons converted to bytestrings) from a publisher to a remote server subscriber over a tcp connection.
Ideally, the publisher would be an actor that would receive internal messages, queue them and then stream them to the subscriber server if there is outstanding demand of course. I understood that what is necessary for this is to extend ActorPublisher class in order to onNext() the messages when needed.
My problem is that so far I am able just to send (receive and decode properly) one shot messages to the server opening a new connection each time. I did not manage to get my head around the akka doc and be able to set the proper tcp Flow with the ActorPublisher.
Here is the code from the publisher:

def send(message: Message): Unit = {
    val system = Akka.system()
    implicit val sys = system

    import system.dispatcher

    implicit val materializer = ActorMaterializer()

    val address =     Play.current.configuration.getString("eventservice.location").getOrElse("localhost")
    val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000)

    /*** Try with actorPublisher ***/
    //val result = Source.actorPublisher[Message]    (Props[EventActor]).via(Flow[Message].map(Json.toJson(_).toString.map(ByteString(_))))

    /*** Try with actorRef ***/
    /*val source = Source.actorRef[Message](0, OverflowStrategy.fail).map(
  m => {
    Logger.info(s"Sending message: ${m.toString}")
    ByteString(Json.toJson(m).toString)
  }
)
    val ref = Flow[ByteString].via(Tcp().outgoingConnection(address, port)).to(Sink.ignore).runWith(source)*/

    val result = Source(Json.toJson(message).toString.map(ByteString(_))).
  via(Tcp().outgoingConnection(address, port)).
  runFold(ByteString.empty) { (acc, in) ⇒ acc ++ in }//Handle the future
}

and the code from the actor which is quite standard in the end:

import akka.actor.Actor
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError}
import akka.stream.actor.{ActorPublisherMessage, ActorPublisher}

import models.events.Message

import play.api.Logger

import scala.collection.mutable

class EventActor extends Actor with ActorPublisher[Message] {
   import ActorPublisherMessage._
   var queue: mutable.Queue[Message] = mutable.Queue.empty

   def receive = {
      case m: Message =>
         Logger.info(s"EventActor - message received and queued: ${m.toString}")
         queue.enqueue(m)
         publish()

      case Request => publish()

      case Cancel =>
          Logger.info("EventActor - cancel message received")
          context.stop(self)

      case OnError(err: Exception) =>
          Logger.info("EventActor - error message received")
          onError(err)
          context.stop(self)

      case OnComplete =>
          Logger.info("EventActor - onComplete message received")
          onComplete()
          context.stop(self)
   }

    def publish() = {
     while (queue.nonEmpty && isActive && totalDemand > 0) {
     Logger.info("EventActor - message published")
     onNext(queue.dequeue())
   }
 }

I can provide the code from the subscriber if necessary:

def connect(system: ActorSystem, address: String, port: Int): Unit = {
implicit val sys = system
import system.dispatcher
implicit val materializer = ActorMaterializer()

val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
  Logger.info("Event server connected to: " + conn.remoteAddress)
  // Get the ByteString flow and reconstruct the msg for handling and then output it back
  // that is how handleWith work apparently
  conn.handleWith(
    Flow[ByteString].fold(ByteString.empty)((acc, b) => acc ++ b).
      map(b => handleIncomingMessages(system, b.utf8String)).
      map(ByteString(_))
  )
}

val connections = Tcp().bind(address, port)
val binding = connections.to(handler).run()

binding.onComplete {
  case Success(b) =>
    Logger.info("Event server started, listening on: " + b.localAddress)
  case Failure(e) =>
    Logger.info(s"Event server could not bind to $address:$port: ${e.getMessage}")
    system.terminate()
}
}

thanks in advance for the hints.

like image 794
Bill'o Avatar asked Mar 06 '16 17:03

Bill'o


People also ask

Is Akka streams distributed?

Unlike heavier “streaming data processing” frameworks, Akka Streams are neither “deployed” nor automatically distributed.

What is Akka streams used for?

Akka Streams is a library to process and transfer a sequence of elements using bounded buffer space. This latter property is what we refer to as boundedness, and it is the defining feature of Akka Streams.

Does Akka use threads?

Behind the scenes, Akka runs actors on real threads and many actors may share one thread. A Actor can create many actors called child actors. Actors interact only through asynchronous messages and never through direct method calls.

What is flow in Akka stream?

A Flow is a set of stream processing steps that has one open input and one open output. Source Flow.scala.


1 Answers

My first recommendation is to not write your own queue logic. Akka provides this out-of-the-box. You also don't need to write your own Actor, Akka Streams can provide it as well.

First we can create the Flow that will connect your publisher to your subscriber via Tcp. In your publisher code you only need to create the ActorSystem once and connect to the outside server once:

//this code is at top level of your application

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
import actorSystem.dispatcher

val host = Play.current.configuration.getString("eventservice.location").getOrElse("localhost")
val port    = Play.current.configuration.getInt("eventservice.port").getOrElse(9000)

val publishFlow = Tcp().outgoingConnection(host, port)

publishFlow is a Flow that will input ByteString data that you want to send to the external subscriber and outputs ByteString data that comes from subscriber:

//  data to subscriber ----> publishFlow ----> data returned from subscriber

The next step is the publisher Source. Instead of writing your own Actor you can use Source.actorRef to "materialize" the Stream into an ActorRef. Essentially the Stream will become an ActorRef for us to use later:

//these values control the buffer
val bufferSize = 1024
val overflowStrategy = akka.stream.OverflowStrategy.dropHead

val messageSource = Source.actorRef[Message](bufferSize, overflowStrategy)

We also need a Flow to convert Messages into ByteString

val marshalFlow = 
  Flow[Message].map(message => ByteString(Json.toJson(message).toString))

Finally we can connect all of the pieces. Since you aren't receiving any data back from the external subscriber we'll ignore any data coming in from the connection:

val subscriberRef : ActorRef = messageSource.via(marshalFlow)
                                            .via(publishFlow)
                                            .runWith(Sink.ignore)     

We can now treat this stream as if it were an Actor:

val message1 : Message = ???

subscriberRef ! message1

val message2 : Message = ???

subscriberRef ! message2
like image 169
Ramón J Romero y Vigil Avatar answered Oct 04 '22 15:10

Ramón J Romero y Vigil