Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Akka HTTP to generate contents via an output stream

I'm quite newbie to Akka Streams and Akka HTTP.

I'd like to generate a simple HTTP server that can generate a zip file from the contents of a folder and send it to the client.

The org.zeroturnaround.zip.ZipUtil makes the task of creating a zip file very easy, but it needs an outputStream.

Here is my solution (written in Scala language):

            val os = new ByteArrayOutputStream()
            ZipUtil.pack(myFolder, os)
            HttpResponse(entity = HttpEntity(
                MediaTypes.`application/zip`,
                os.toByteArray))

This solution works, but keeps all the contents to memory, so it isn't scalable.

I think the key for solving this is to use this:

val source = StreamConverters.asOutputStream()

but don't know how to use it. :-(

Any help please?

like image 706
david.perez Avatar asked Dec 13 '16 16:12

david.perez


2 Answers

Try this

val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream()
  .mapMaterializedValue(os => ZipUtil.pack(myFolder, os))
HttpResponse(entity = HttpEntity(
            MediaTypes.`application/zip`,
            byteSource))

You only get access to the OutputStream once the source gets materialized, which might not happen immediately. In theory the source could also materialized multiple times, so you should be able to deal with this.

like image 76
Rüdiger Klaehn Avatar answered Oct 10 '22 12:10

Rüdiger Klaehn


I had same problem. In order to make it backpressure-compatible I had to write artificial InputStream which is later converted to Source via StreamConverters.fromInputStream(() => input) which in turn you return from your Akka-Http DSL complete directive.

Here is what I wrote.

import java.io.{File, IOException, InputStream}
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.time.format.DateTimeFormatter

import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile}

import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}

class DownloadStatsZipReader(path: String, password: String) extends InputStream {

  private val (archive, targetDate) = {
    val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName()))

    @tailrec
    def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] =
      Option(inputFile.getNextEntry) match {
        case Some(entry) =>
          if (!entry.isDirectory) {
            val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)")
            if (parts(1) == "tab" && entry.getSize > 0)
              Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match {
                case Success(localDate) =>
                  Some(localDate -> entry)
                case Failure(_) =>
                  findValidEntry()
              }
            else
              findValidEntry()
          } else
            findValidEntry()
        case None => None
      }

    val (date, _) = findValidEntry().getOrElse {
      throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`")
    }
    inputFile -> date
  }

  private val buffer = new Array[Byte](1024)
  private var offsetBuffer: Int = 0
  private var sizeBuffer: Int = 0

  def getTargetDate: LocalDate = targetDate

  override def read(): Int =
    sizeBuffer match {
      case -1 =>
        -1
      case 0 =>
        loadNextChunk()
        read()
      case _ =>
        if (offsetBuffer < sizeBuffer) {
          val result = buffer(offsetBuffer)
          offsetBuffer += 1
          result
        } else {
          sizeBuffer = 0
          read()
        }
    }

  @throws[IOException]
  override def close(): Unit = {
    archive.close()
  }

  private def loadNextChunk(): Unit = try {
    val bytesRead = archive.read(buffer)
    if (bytesRead >= 0) {
      offsetBuffer = 0
      sizeBuffer = bytesRead
    } else {
      offsetBuffer = -1
      sizeBuffer = -1
    }
  } catch {
    case ex: Throwable =>
      ex.printStackTrace()
      throw ex
  }
}

If you find bugs in my code please let me know.

like image 3
expert Avatar answered Oct 10 '22 14:10

expert