Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Play Framework Scala: How to Stream Request Body

I'm building a micro-service using Play Framework 2.3.x using Scala (I'm a beginner in both) but I can't figure out a way to stream my request body.

Here is the problem:

I need an endpoint /transform where I can receive a huge TSV file that I will parse and render in another format: simple transformation. The problem is that every single command in my controller is ran "too late". It waits to receive the full file before starting the code.

Example:

  def transform = Action.async {
    Future {
      Logger.info("Too late")
      Ok("A response")
    }
  }

I want to be able to read line-by-line the request body during its upload and process already the request without having to wait for the file to be received completely.

Any hint would be welcome.

like image 521
Cecile Avatar asked Jul 08 '16 15:07

Cecile


1 Answers

This answer applies to Play 2.5.x and higher since it uses the Akka streams API that replaced Play's Iteratee-based streaming in that version.

Basically, you can create a body parser that returns a Source[T] that you can pass to Ok.chunked(...). One way to do this is to use Accumulator.source[T] in the body parser. For example, an action that just returned data sent to it verbatim might look like this:

def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
  // Return the source directly. We need to return
  // an Accumulator[Either[Result, T]], so if we were
  // handling any errors we could map to something like
  // a Left(BadRequest("error")). Since we're not
  // we just wrap the source in a Right(...)
  Accumulator.source[ByteString]
    .map(Right.apply)
}

def stream = Action(verbatimBodyParser) { implicit request =>
  Ok.chunked(request.body)
}

If you want to do something like transform a TSV file you can use a Flow to transform the source, e.g:

val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>

  val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
    // Chunk incoming bytes by newlines, truncating them if the lines
    // are longer than 1000 bytes...
    .via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
    // Replace tabs by commas. This is just a silly example and
    // you could obviously do something more clever here...
    .map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))

  Accumulator.source[ByteString]
    .map(_.via(transformFlow))
    .map(Right.apply)
}

def convert = Action(tsvToCsv) { implicit request =>
  Ok.chunked(request.body).as("text/csv")
}

There may be more inspiration in the Directing the Body Elsewhere section of the Play docs.

like image 57
Mikesname Avatar answered Sep 22 '22 00:09

Mikesname