Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka HTTP Streaming JSON Deserialization

Is it possible to dynamically deserialize an external, of unknown length, ByteString stream from Akka HTTP into domain objects?


Context

I call an infinitely long HTTP endpoint that outputs a JSON Array that keeps growing:

[
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    { "prop": true, "prop2": false, "prop3": 97, "prop4": "sample" },
    ...
] <- Never sees the daylight
like image 521
Martijn Avatar asked Dec 14 '15 16:12

Martijn


1 Answers

I guess that JsonFraming.objectScanner(Int.MaxValue) should be used in this case. As docs state:

Returns a Flow that implements a "brace counting" based framing operator for emitting valid JSON chunks. It scans the incoming data stream for valid JSON objects and returns chunks of ByteStrings containing only those valid chunks. Typical examples of data that one may want to frame using this operator include: Very large arrays

So you can end up with something like this:

val response: Future[HttpResponse] = Http().singleRequest(HttpRequest(uri = serviceUrl))

response.onComplete {
  case Success(value) =>
    value.entity.dataBytes
      .via(JsonFraming.objectScanner(Int.MaxValue))
      .map(_.utf8String)         // In case you have ByteString
      .map(decode[MyEntity](_))  // Use any Unmarshaller here
      .grouped(20)
      .runWith(Sink.ignore)      // Do whatever you need here 
  case Failure(exception) => log.error(exception, "Api call failed")
}
like image 181
Dmitry Kach Avatar answered Nov 04 '22 10:11

Dmitry Kach