I've been playing around with the experimental Akka Streams API a bit and I have a use case that I wanted to see how to implement. For my use case, I have a StreamTcp
based Flow
that is being fed from binding the input stream of connections to my server socket. The Flow that I have is based on ByteString
data coming into it. The data that is coming in is going to have a delimiter in it that means I should treat everything before the delimiter as one message and everything after and up to the next delimiter as the next message. So playing around with a simpler example, using no sockets and just static text, this is what I came up with:
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
The main function on the Flow
that I found to accomplish my goal was splitWhen
, which then produces additional sub-flows, one for each message per that .
delimiter. I then process each sub-flow with another pipeline of steps, finally printing the individual messages at the end.
This all seems a bit verbose, to accomplish what I thought to be a pretty simple and common use case. So my question is, is there a cleaner and less verbose way to do this or is this the correct and preferred way to split a stream up by a delimiter?
I think Andrey's use of Framing
is the best solution to your question but I had a similar problem and found Framing
to be too limited. I used statefulMapConcat
instead which allows you to group your input ByteString using any rules you like. Here's the code for your question in case it helps anyone:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.util.ByteString
object BasicTransformation extends App {
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
implicit val dispatcher = system.dispatcher
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
val grouping = Flow[Byte].statefulMapConcat { () =>
var bytes = ByteString()
byt =>
if (byt == '.') {
val string = bytes.utf8String
bytes = ByteString()
List(string)
} else {
bytes :+= byt
Nil
}
}
Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate())
}
Which produces:
Lorem Ipsum is simply
Dummy text of the printing
And typesetting industry
It looks like the API was recently improved to include akka.stream.scaladsl.Framing. The documentation also contains an example of how to use it. Concerning your specific question:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Framing, Source}
import akka.util.ByteString
import com.typesafe.config.ConfigFactory
object TcpDelimiterBasedMessaging extends App {
object chunks {
val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
val second = ByteString("More text.delimited by.a period.")
}
implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference())
implicit val dispatcher = system.dispatcher
implicit val materializer = ActorMaterializer()
Source(chunks.first :: chunks.second :: Nil)
.via(Framing.delimiter(ByteString("."), Int.MaxValue))
.map(_.utf8String)
.runForeach(println)
.onComplete(_ => system.terminate())
}
Produces the following output:
Lorem Ipsum is simply
Dummy text of the printing
And typesetting industry
More text
delimited by
a period
There is sample code that does something similar posted now in the Streams Cookbook in the akka-streams documentation at Parsing lines from a stream of ByteStrings.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With