I'm quite newbie to Akka Streams and Akka HTTP.
I'd like to generate a simple HTTP server that can generate a zip file from the contents of a folder and send it to the client.
The org.zeroturnaround.zip.ZipUtil makes the task of creating a zip file very easy, but it needs an outputStream
.
Here is my solution (written in Scala language):
val os = new ByteArrayOutputStream()
ZipUtil.pack(myFolder, os)
HttpResponse(entity = HttpEntity(
MediaTypes.`application/zip`,
os.toByteArray))
This solution works, but keeps all the contents to memory, so it isn't scalable.
I think the key for solving this is to use this:
val source = StreamConverters.asOutputStream()
but don't know how to use it. :-(
Any help please?
Try this
val byteSource: Source[ByteString, Unit] = StreamConverters.asOutputStream()
.mapMaterializedValue(os => ZipUtil.pack(myFolder, os))
HttpResponse(entity = HttpEntity(
MediaTypes.`application/zip`,
byteSource))
You only get access to the OutputStream once the source gets materialized, which might not happen immediately. In theory the source could also materialized multiple times, so you should be able to deal with this.
I had same problem. In order to make it backpressure-compatible I had to write artificial InputStream
which is later converted to Source
via StreamConverters.fromInputStream(() => input)
which in turn you return from your Akka-Http DSL complete
directive.
Here is what I wrote.
import java.io.{File, IOException, InputStream}
import java.nio.charset.StandardCharsets
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import org.apache.commons.compress.archivers.sevenz.{SevenZArchiveEntry, SevenZFile}
import scala.annotation.tailrec
import scala.util.{Failure, Success, Try}
class DownloadStatsZipReader(path: String, password: String) extends InputStream {
private val (archive, targetDate) = {
val inputFile = new SevenZFile(new File(path), password.getBytes(StandardCharsets.UTF_16LE.displayName()))
@tailrec
def findValidEntry(): Option[(LocalDate, SevenZArchiveEntry)] =
Option(inputFile.getNextEntry) match {
case Some(entry) =>
if (!entry.isDirectory) {
val parts = entry.getName.toLowerCase.split("\\.(?=[^\\.]+$)")
if (parts(1) == "tab" && entry.getSize > 0)
Try(LocalDate.parse(parts(0), DateTimeFormatter.ISO_LOCAL_DATE)) match {
case Success(localDate) =>
Some(localDate -> entry)
case Failure(_) =>
findValidEntry()
}
else
findValidEntry()
} else
findValidEntry()
case None => None
}
val (date, _) = findValidEntry().getOrElse {
throw new RuntimeException(s"$path has no files named as `YYYY-MM-DD.tab`")
}
inputFile -> date
}
private val buffer = new Array[Byte](1024)
private var offsetBuffer: Int = 0
private var sizeBuffer: Int = 0
def getTargetDate: LocalDate = targetDate
override def read(): Int =
sizeBuffer match {
case -1 =>
-1
case 0 =>
loadNextChunk()
read()
case _ =>
if (offsetBuffer < sizeBuffer) {
val result = buffer(offsetBuffer)
offsetBuffer += 1
result
} else {
sizeBuffer = 0
read()
}
}
@throws[IOException]
override def close(): Unit = {
archive.close()
}
private def loadNextChunk(): Unit = try {
val bytesRead = archive.read(buffer)
if (bytesRead >= 0) {
offsetBuffer = 0
sizeBuffer = bytesRead
} else {
offsetBuffer = -1
sizeBuffer = -1
}
} catch {
case ex: Throwable =>
ex.printStackTrace()
throw ex
}
}
If you find bugs in my code please let me know.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With