Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Forward a file upload stream to S3 through Iteratee with Play2 / Scala

I've read some stuff about the possibility to send a file to S3 through Iteratee, which seems to permit to send so S3 chunks of a file as we receive them and avoid an OutOfMemory for large files for exemple.

I've found this SO post which is probably almost what i need to do: Play 2.x : Reactive file upload with Iteratees I don't really understand how to do it, or either if it's really available in Play 2.0.2 (because Sadek Brodi says foldM is available in Play 2.1 only for exemple)

Can someone explain this in a simple way, for someone who have read some blog about Iteratees, and is not yet a Scala/Play2 expert?

I don't even know if i should use a multipart body parser or something like that, but one thing i know is that i don't understand what this code is doing:

val consumeAMB = 
  Traversable.takeUpTo[Array[Byte]](1028*1028) &>> Iteratee.consume()

val rechunkAdapter:Enumeratee[Array[Byte],Array[Byte]] =
  Enumeratee.grouped(consumeAMB)

val writeToStore: Iteratee[Array[Byte],_] =
  Iteratee.foldM[Array[Byte],_](connectionHandle){ (c,bytes) => 
    // write bytes and return next handle, probable in a Future
  }

BodyParser( rh => (rechunkAdapter &>> writeToStore).map(Right(_)))

By the way, what will be the difference in memory consumption compared to using classic Java InputStream / OutputStream. I actually am able to forward a 500mb file to S3 in a non-blocking way, with a very low memory consumption, without using Iteratees, using Java + AsyncHttpClient + Grizzly (but I guess it would also work with Netty).

So what's the advantage of using Iteratee?

One difference I can see is that the InputStream I get and forward to S3 is in my case backed by a temporary file (this is a CXF behavior), so it may not be as reactive as Play Iteratee

But with Iteratees, if the Enumerator produces bytes received by the connection and forward them to S3 through an Iteratee, then if the connection to S3 is not good and the bytes can't be forwarded very fastly, where are stored the "pending" bytes?

like image 333
Sebastien Lorber Avatar asked Sep 26 '12 20:09

Sebastien Lorber


1 Answers

Simple explanation? I'll try. :)

You're building a pipeline out of components. Once you've constructed the pipeline it can be sent data. It's an Iteratee, so it knows how to iterate on data.

The file you want to upload is contained in the request body and a BodyParser is what handles request bodies in Play. So you put your iteratee pipeline into a BodyParser. When a request is made your pipeline will be sent the data (it will iterate over it).

Your pipeline (rechunkAdapter &>> writeToStore) chunks data into 1MB bits then sends them to S3.

The first part of the pipeline (rechunkAdapter) does the chunking. It actually has its own mini-pipeline that does the chunking (consumeAMB). Once the mini-pipe has received enough data to make a chunk it sends it out the main pipeline.

The second part of the pipeline (writeToStore) is like a loop that gets called on each chunk, giving you the opportunity to send each chunk to S3.

Advantages of iteratees?

Once you know what's going on you can build iteration pipelines by plugging together components. And the type checker will most often tell you when you plug something together incorrectly.

For example, we can modify the pipeline above to fix the fact that it's slow. It's probably slow because the request upload is paused whenever a chunk is ready to upload to S3. It's important to slow down the request upload so that we don't run out of memory, but we could be a bit more forgiving by adding a fixed-sized buffer. So just add Concurrent.buffer(2) into the middle of the pipeline to buffer up to 2 chunks.

Iteratees provide a functional approach to streams. This is an advantage or disadvantage, depending on how you feel about functional programming. :) Compared to lazy streams (another functional approach) iteratees offer precise control over resource usge.

Finally, iteratees allow us to do very complex asynchronous stream programming relatively (!) simply. We can process IO without holding threads, which is a huge win for scalability. The classic Java InputStream/OutputStream example requires 2 threads.

like image 87
Rich Dougherty Avatar answered Nov 15 '22 23:11

Rich Dougherty