I'm trying to use the Alpakka S3 connector to do the following:
The code I used is something like this:
val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")
val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))
source
.via(Archive.zip())
.to(s3Sink)
.run()
However, this results in the following error:
Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.
I suspect this is due to the fact that the underlying Akka Http used by the S3 connector expects every download response to be consumed before moving to the next one, but I wasn't able to handle this in a reasonable way without introducing waits/delays.
I tried using a queue with bufferSize = 1
, but that didn't work either.
I'm fairly new to Akka and Akka Streams.
Let's break down what's happening here by dissecting the return type of S3.download
: Source[Optional[JPair[Source[ByteString, NotUsed], ObjectMetadata]], NotUsed]
.
The outer Source
represents the outstanding request.
The Optional
is empty if the file isn't found in the bucket. If it is present, it contains a Pair
of another Source
which represents the byte contents of the file, and the ObjectMetadata
which represent the metadata of the file you're downloading.
What's counter-intuitive is that Source
is usually represented as a cold, stateless, shareable part of a blueprint of some streaming action, only springing to life once it is materialized. For the outer Source
, this is the case. The inner Source
, however, is uncharacteristically instantly "hot". Once the outer Source
is materialized and emits an item, that item represents an open HTTP connection that you're supposed to start consuming within (by default) 1 second, or else the Response entity was not subscribed
error is raised.
In the original question, Source.combine
is called with the Merge(_)
strategy, which causes parallel materialization. Archive.zip
will handle the files sequentially, but if fully consuming the first Source[ByteString]
it receives takes longer than 1 second, the second request will time out before its time comes.
One surefire way of making sure this doesn't happen is by consuming the whole inner Source
before handing it over to the next item in the stage. Consider:
Source(1 to 10)
.flatMapMerge(4, i => S3.download("my-s3-bucket", s"random$i.png")
.log("started file download")
.addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
.flatMapConcat {
case Some((s, m)) =>
// for demo purposes, make sure individual downloads take >1 second
s.delay(2.seconds, DelayOverflowStrategy.backpressure)
// read entire file contents into a single ByteString
.reduce(_ ++ _)
.map(bs => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), Source.single(bs)))
})
.log("completed file download")
.addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
.via(Archive.zip())
.to(s3Sink)
.run()
This (tested!) code downloads up to 4 files concurrently (the first parameter to flatMapMerge
). Note how the reduce
step reads the entire response in memory before passing it on to Archive.zip()
. This is not ideal but it might be acceptable for small files.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With