Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Consume TCP stream and redirect it to another Sink (with Akka Streams)

I try to redirect/forward a TCP stream to another Sink with Akka 2.4.3. The program should open a server socket, listen for incoming connections and then consume the tcp stream. Our sender does not expect/accept replies from us so we never send back anything - we just consume the stream. After framing the tcp stream we need to transform the bytes into something more useful and send it to the Sink.

I tried the following so far but i struggle especially with the part how to not sending tcp packets back to the sender and to properly connect the Sink.

import scala.util.Failure
import scala.util.Success

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Tcp
import akka.stream.scaladsl.Framing
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.scaladsl.Flow

object TcpConsumeOnlyStreamToSink {
  implicit val system = ActorSystem("stream-system")
  private val log = Logging(system, getClass.getName)    

  //The Sink
  //In reality this is of course a real Sink doing some useful things :-)
  //The Sink accept types of "SomethingMySinkUnderstand"
  val mySink = Sink.ignore;

  def main(args: Array[String]): Unit = {
    //our sender is not interested in getting replies from us
    //so we just want to consume the tcp stream and never send back anything to the sender
    val (address, port) = ("127.0.0.1", 6000)
    server(system, address, port)
  }

  def server(system: ActorSystem, address: String, port: Int): Unit = {
    implicit val sys = system
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
    val handler = Sink.foreach[Tcp.IncomingConnection] { conn =>
      println("Client connected from: " + conn.remoteAddress)

      conn handleWith Flow[ByteString]
      //this is neccessary since we use a self developed tcp wire protocol
      .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
      //here we want to map the raw bytes into something our Sink understands
      .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
      //here we like to connect our Sink to the Tcp Source
      .to(mySink) //<------ NOT COMPILING
    }


    val tcpSource = Tcp().bind(address, port)
    val binding = tcpSource.to(handler).run()

    binding.onComplete {
      case Success(b) =>
        println("Server started, listening on: " + b.localAddress)
      case Failure(e) =>
        println(s"Server could not bind to $address:$port: ${e.getMessage}")
        system.terminate()
    }

  }

  class SomethingMySinkUnderstand(x:String) {

  }
}

Update: Add this to your build.sbt file to get necessary deps

libraryDependencies += "com.typesafe.akka" % "akka-stream_2.11" % "2.4.3"
like image 495
salyh Avatar asked Aug 31 '16 20:08

salyh


1 Answers

handleWith expects a Flow, i.e. a box with an unconnected inlet and an unconnected outlet. You effectively provide a Source, because you connected the Flow with a Sink by using the to operation.

I think you could do the following:

conn.handleWith(
  Flow[ByteString]
    .via(Framing.lengthField(4, 0, 65532, ByteOrder.BIG_ENDIAN)) 
    .map(msg => new SomethingMySinkUnderstand(msg.utf8String))
    .alsoTo(mySink)
    .map(_ => ByteString.empty)
    .filter(_ => false) // Prevents sending anything back
)
like image 67
Heiko Seeberger Avatar answered Sep 30 '22 08:09

Heiko Seeberger