Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to download a HTTP resource to a file with Akka Streams and HTTP?

Over the past few days I have been trying to figure out the best way to download a HTTP resource to a file using Akka Streams and HTTP.

Initially I started with the Future-Based Variant and that looked something like this:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

That was kind of okay but once I learnt more about pure Akka Streams I wanted to try and use the Flow-Based Variant to create a stream starting from a Source[HttpRequest]. At first this completely stumped me until I stumbled upon the flatMapConcat flow transformation. This ended up a little more verbose:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match {
  case (responseTry, context) => (responseTry.get, context)
}

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match {
  case (response, _) => response.entity.dataBytes
}

def downloadViaFlow(uri: Uri, file: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSource).
    runWith(FileIO.toFile(file))
}

Then I wanted to get a little tricky and use the Content-Disposition header.

Going back to the Future-Based Variant:

def destinationFile(downloadDir: File, response: HttpResponse): File = {
  val fileName = response.header[ContentDisposition].get.value
  val file = new File(downloadDir, fileName)
  file.createNewFile()
  file
}

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val responseFuture = Http().singleRequest(request)
  responseFuture.flatMap { response =>
    val file = destinationFile(downloadDir, response)
    val source = response.entity.dataBytes
    source.runWith(FileIO.toFile(file))
  }
}

But now I have no idea how to do this with the Future-Based Variant. This is as far as I got:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match {
  case (response, _) =>
    val source = responseToByteSource(in)
    val file = destinationFile(downloadDir, response)
    source.map((_, file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = {
  val request = Get(uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()
  val sourceWithDest: Source[(ByteString, File), Unit] = source.
    via(requestResponseFlow).
    map(responseOrFail).
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir))
  sourceWithDest.runWith(???)
}

So now I have a Source that will emit one or more (ByteString, File) elements for each File (I say each File since there is no reason the original Source has to be a single HttpRequest).

Is there anyway to take these and route them to a dynamic Sink?

I'm thinking something like flatMapConcat, such as:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ???

So that I could complete downloadViaFlow2 with:

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = {
  val sink = FileIO.toFile(destination, true)
  Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right)
}
sourceWithDest.runWithMap {
  case (_, file) => destToSink(file)
}
like image 940
steinybot Avatar asked Jan 20 '16 22:01

steinybot


1 Answers

The solution does not require a flatMapConcat. If you don't need any return values from the file writing then you can use Sink.foreach:

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = {
  val file = destinationFile(downloadDir, httpResponse)
  httpResponse.entity.dataBytes.runWith(FileIO.toFile(file))
}

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = {
  val request = HttpRequest(uri=uri)
  val source = Source.single((request, ()))
  val requestResponseFlow = Http().superPool[Unit]()

  source.via(requestResponseFlow)
        .map(responseOrFail)
        .map(_._1)
        .runWith(Sink.foreach(writeFile(downloadDir)))
}

Note that the Sink.foreach creates Futures from the writeFile function. Therefore there's not much back-pressure involved. The writeFile could be slowed down by the hard drive but the stream would keep generating Futures. To control this you can use Flow.mapAsyncUnordered (or Flow.mapAsync) :

val parallelism = 10

source.via(requestResponseFlow)
      .map(responseOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.ignore)

If you want to accumulate the Long values for a total count you need to combine with a Sink.fold:

source.via(requestResponseFlow)
      .map(responseOrFail)
      .map(_._1)
      .mapAsyncUnordered(parallelism)(writeFile(downloadDir))
      .runWith(Sink.fold(0L)(_ + _))

The fold will keep a running sum and emit the final value when the source of requests has dried up.

like image 112
Ramón J Romero y Vigil Avatar answered Oct 11 '22 10:10

Ramón J Romero y Vigil