I'm trying to implement an Akka Streams Flow
that will convert a stream of JSON objects to a stream of a single array of JSON objects. I can use Concat
to add an "[" before and "]" after, as well as Zip
to insert commas in between elements, but I can't figure out how to not insert the final comma.
The code I have so far is:
trait JsonStreamSupport {
protected def toJsonArrayString[T : Writes] = Flow[T].map(Json.toJson(_)).map(_.toString()).via(jsonArrayWrapper)
private[this] val jsonArrayWrapper: Flow[String, String, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val start = Source.single("[")
val comma = Source.repeat(",")
val end = Source.single("]")
val concat = b.add(Concat[String](3))
val zip = b.add(Zip[String,String])
comma ~> zip.in1
start ~> concat.in(0)
zip.out.map({case (msg,delim) => msg + delim}) ~> concat.in(1)
end ~> concat.in(2)
FlowShape(zip.in0, concat.out)
})
}
Currently the output is:[{"key":"value},{"key","value"},]
but I need it to be [{"key":"value},{"key","value"}]
(without final comma),
where each element of the array is still a distinct element of the stream so can be, for example, sent over chunked HTTP separately.
just found out about intersperse
which is exactly what you need, and much simpler than what I suggested in the first place:
http://doc.akka.io/api/akka/2.4.4/index.html#akka.stream.scaladsl.Flow@intersperse[T%3E:Out]%28start:T,inject:T,end:T%29:FlowOps.this.Repr[T]
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With