Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple download requests with Alpakka S3 connector

I'm trying to use the Alpakka S3 connector to do the following:

  • Download a number of files from AWS S3
  • Stream the downloaded files via the Alpakka Zip Archive Flow
  • Upload the Zip stream back to S3 Sink

The code I used is something like this:

val s3Sink: Sink[ByteString, Future[MultipartUploadResult]] = S3.multipartUpload("my-s3-bucket", "archive.zip")

val sourceList = (1 to 10).map(i => S3.download("my-s3-bucket", s"random$i.png").map {
    case Some((s, m)) => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), s)
})
val source = Source.combine(sourceList.head, sourceList.tail.head, sourceList.tail.tail: _*)(Merge(_))

source
    .via(Archive.zip())
    .to(s3Sink)
    .run()

However, this results in the following error:

Response entity was not subscribed after 1 second. Make sure to read the response entity body or call `discardBytes()` on it.

I suspect this is due to the fact that the underlying Akka Http used by the S3 connector expects every download response to be consumed before moving to the next one, but I wasn't able to handle this in a reasonable way without introducing waits/delays. I tried using a queue with bufferSize = 1, but that didn't work either.

I'm fairly new to Akka and Akka Streams.

like image 585
rogueai Avatar asked Nov 15 '22 20:11

rogueai


1 Answers

Let's break down what's happening here by dissecting the return type of S3.download: Source[Optional[JPair[Source[ByteString, NotUsed], ObjectMetadata]], NotUsed].

The outer Source represents the outstanding request. The Optional is empty if the file isn't found in the bucket. If it is present, it contains a Pair of another Source which represents the byte contents of the file, and the ObjectMetadata which represent the metadata of the file you're downloading.

What's counter-intuitive is that Source is usually represented as a cold, stateless, shareable part of a blueprint of some streaming action, only springing to life once it is materialized. For the outer Source, this is the case. The inner Source, however, is uncharacteristically instantly "hot". Once the outer Source is materialized and emits an item, that item represents an open HTTP connection that you're supposed to start consuming within (by default) 1 second, or else the Response entity was not subscribed error is raised.

In the original question, Source.combine is called with the Merge(_) strategy, which causes parallel materialization. Archive.zip will handle the files sequentially, but if fully consuming the first Source[ByteString] it receives takes longer than 1 second, the second request will time out before its time comes.

One surefire way of making sure this doesn't happen is by consuming the whole inner Source before handing it over to the next item in the stage. Consider:

Source(1 to 10)
  .flatMapMerge(4, i => S3.download("my-s3-bucket", s"random$i.png")
    .log("started file download")
    .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
    .flatMapConcat {
      case Some((s, m)) =>
        // for demo purposes, make sure individual downloads take >1 second
        s.delay(2.seconds, DelayOverflowStrategy.backpressure)
          // read entire file contents into a single ByteString
          .reduce(_ ++ _)
          .map(bs => (ArchiveMetadata(s"${UUID.randomUUID()}.png"), Source.single(bs)))
    })
  .log("completed file download")
  .addAttributes(Attributes.logLevels(onElement = Attributes.LogLevels.Info))
  .via(Archive.zip())
  .to(s3Sink)
  .run()

This (tested!) code downloads up to 4 files concurrently (the first parameter to flatMapMerge). Note how the reduce step reads the entire response in memory before passing it on to Archive.zip(). This is not ideal but it might be acceptable for small files.

like image 153
László van den Hoek Avatar answered Dec 19 '22 21:12

László van den Hoek