Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Play Framework [2.2-scala]: creating Enumerator from a slow InputStream

I'm implementing a AWS S3 file delivery API. I'm forced to stream the bytes from S3's S3ObjectInputStream to the browser. We have a use case were serving the file with cloudfront is not an option (mainly local development)

I have an InputStream, so the most obvious thing to do would be to use Ok.chunked with an Enumerator.fromStream(), but the Enumerator.fromStream() has a very clear warning that the stream should not be slow. I'm assuming that an AWS S3ObjectInputStream is probably one of the slowest stream there is.

http://www.playframework.com/documentation/2.2.x/api/scala/index.html#play.api.libs.iteratee.Enumerator$

def fromStream(input: InputStream, chunkSize: Int = 1024 * 8)
              (implicit ec: ExecutionContext): Enumerator[Array[Byte]]

Create an enumerator from the given input stream.

This enumerator will block on reading the input stream, in the default iteratee
thread pool. Care must therefore be taken to ensure that this isn't a 
slow stream. If using this with slow input streams, consider setting the value
of iteratee-threadpool-size to a value appropriate for handling the blocking.

So I was wondering what the safest way is to avoid thread starvation and get the file streaming to the browser without holding the entire file in memory.

Is there another way to get an Enumerator (or something we can send in a Result) from an InputStream?

like image 653
Jaap Avatar asked Nov 10 '22 16:11

Jaap


1 Answers

I've actually somehow misread the documentation. Enumerator.fromStream has an implicit ExecutionContext you can provide.
If you create a dedicated context for this specific type of operations, you can still experience thread starvation, but you're in control of which threadpool gets that problem.

We're using play! so we can just configure akka threadpools in our application.conf:

# this is a root value in the application.conf, but you can put it anywhere
# as long as you provide the full path to the .lookup() function
my-contexts {
  s3-streaming {
    fork-join-executor {
      parallelism-min = 50
      parallelism-max = 50
    }
  }
}

and use them in the code like this:

object MyContexts {
  val s3Streaming: ExecutionContext = 
    Akka.system.dispatchers.lookup("my-contexts.s3-streaming")
}

...

Enumerator.fromStream(stream)(MyContexts.s3Streaming)
like image 126
Jaap Avatar answered Nov 22 '22 04:11

Jaap