Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to split an inbound stream on a delimiter character using Akka Streams

I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a StreamTcp based Flow that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on ByteString data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with:

import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString

object BasicTransformation {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem("Sys")

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

    Flow(data).
      splitWhen(c => c == '.').
      foreach{producer => 
        Flow(producer).
          filter(c => c != '.').
          fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
          map(_.toString).
          filter(!_.isEmpty).
          foreach(println(_)).
          consume(FlowMaterializer(MaterializerSettings()))
      }.
      onComplete(FlowMaterializer(MaterializerSettings())) {
        case any =>
          system.shutdown
      }
  }
}

The main function on the Flow that I found to accomplish my goal was splitWhen, which then produces additional sub-flows, one for each message per that . delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end.

This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter?

like image 653
cmbaxter Avatar asked Sep 02 '14 19:09

cmbaxter


3 Answers

I think Andrey's use of Framing is the best solution to your question but I had a similar problem and found Framing to be too limited. I used statefulMapConcat instead which allows you to group your input ByteString using any rules you like. Here's the code for your question in case it helps anyone:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString

object BasicTransformation extends App {

  implicit val system = ActorSystem("Sys")
  implicit val materializer = ActorMaterializer()
  implicit val dispatcher = system.dispatcher
  val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")

  val grouping = Flow[Byte].statefulMapConcat { () =>
    var bytes = ByteString()
    byt =>
      if (byt == '.') {
        val string = bytes.utf8String
        bytes = ByteString()
        List(string)
      } else {
        bytes :+= byt
        Nil
      }
  }

  Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}

Which produces: Lorem Ipsum is simply Dummy text of the printing And typesetting industry

like image 101
David Avatar answered Oct 16 '22 19:10

David


It looks like the API was recently improved to include akka.stream.scaladsl.Framing. The documentation also contains an example of how to use it. Concerning your specific question:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Source}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory

object TcpDelimiterBasedMessaging extends App {
  object chunks {
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
    val second = ByteString("More text.delimited by.a period.")
  }

  implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference())
  implicit val dispatcher = system.dispatcher
  implicit val materializer = ActorMaterializer()

  Source(chunks.first :: chunks.second :: Nil)
    .via(Framing.delimiter(ByteString("."), Int.MaxValue))
    .map(_.utf8String)
    .runForeach(println)
    .onComplete(_ => system.terminate())
}

Produces the following output: Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

like image 36
Andrey Avatar answered Oct 16 '22 20:10

Andrey


There is sample code that does something similar posted now in the Streams Cookbook in the akka-streams documentation at Parsing lines from a stream of ByteStrings.

like image 2
Eric Zoerner Avatar answered Oct 16 '22 20:10

Eric Zoerner