I am building a Play Framework application in Scala where I would like to stream an array of bytes to S3. I am using the Play-S3 library to do this. The "Multipart file upload" of the documentation section is what's relevant here:
// Retrieve an upload ticket
val result:Future[BucketFileUploadTicket] =
bucket initiateMultipartUpload BucketFile(fileName, mimeType)
// Upload the parts and save the tickets
val result:Future[BucketFilePartUploadTicket] =
bucket uploadPart (uploadTicket, BucketFilePart(partNumber, content))
// Complete the upload using both the upload ticket and the part upload tickets
val result:Future[Unit] =
bucket completeMultipartUpload (uploadTicket, partUploadTickets)
I am trying to do the same thing in my application but with Iteratee
s and Enumerator
s.
The streams and asynchronicity make things a little complicated, but here is what I have so far (Note uploadTicket
is defined earlier in the code):
val partNumberStream = Stream.iterate(1)(_ + 1).iterator
val partUploadTicketsIteratee = Iteratee.fold[Array[Byte], Future[Vector[BucketFilePartUploadTicket]]](Future.successful(Vector.empty[BucketFilePartUploadTicket])) { (partUploadTickets, bytes) =>
bucket.uploadPart(uploadTicket, BucketFilePart(partNumberStream.next(), bytes)).flatMap(partUploadTicket => partUploadTickets.map( _ :+ partUploadTicket))
}
(body |>>> partUploadTicketsIteratee).andThen {
case result =>
result.map(_.map(partUploadTickets => bucket.completeMultipartUpload(uploadTicket, partUploadTickets))) match {
case Success(x) => x.map(d => println("Success"))
case Failure(t) => throw t
}
}
Everything compiles and runs without incident. In fact, "Success"
gets printed, but no file ever shows up on S3.
There might be multiple problems with your code. It's a bit unreadable caused by the map
method calls. You might have a problem with your future composition. Another problem might be caused by the fact that all chunks (except for the last) should be at least 5MB.
The code below has not been tested, but shows a different approach. The iteratee approach is one where you can create small building blocks and compose them into a pipe of operations.
To make the code compile I added a trait and a few methods
trait BucketFilePartUploadTicket
val uploadPart: (Int, Array[Byte]) => Future[BucketFilePartUploadTicket] = ???
val completeUpload: Seq[BucketFilePartUploadTicket] => Future[Unit] = ???
val body: Enumerator[Array[Byte]] = ???
Here we create a few parts
// Create 5MB chunks
val chunked = {
val take5MB = Traversable.takeUpTo[Array[Byte]](1024 * 1024 * 5)
Enumeratee.grouped(take5MB transform Iteratee.consume())
}
// Add a counter, used as part number later on
val zipWithIndex = Enumeratee.scanLeft[Array[Byte]](0 -> Array.empty[Byte]) {
case ((counter, _), bytes) => (counter + 1) -> bytes
}
// Map the (Int, Array[Byte]) tuple to a BucketFilePartUploadTicket
val uploadPartTickets = Enumeratee.mapM[(Int, Array[Byte])](uploadPart.tupled)
// Construct the pipe to connect to the enumerator
// the ><> operator is an alias for compose, it is more intuitive because of
// it's arrow like structure
val pipe = chunked ><> zipWithIndex ><> uploadPartTickets
// Create a consumer that ends by finishing the upload
val consumeAndComplete =
Iteratee.getChunks[BucketFilePartUploadTicket] mapM completeUpload
Running it is done by simply connecting the parts
// This is the result, a Future[Unit]
val result = body through pipe run consumeAndComplete
Note that I did not test any code and might have made some mistakes in my approach. This however shows a different way of dealing with the problem and should probably help you to find a good solution.
Note that this approach waits for one part to complete upload before it takes on the next part. If the connection from your server to amazon is slower than the connection from the browser to you server this mechanism will slow the input.
You could take another approach where you do not wait for the Future
of the part upload to complete. This would result in another step where you use Future.sequence
to convert the sequence of upload futures into a single future containing a sequence of the results. The result would be a mechanism sending a part to amazon as soon as you have enough data.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With