Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Akka stream - Splitting a stream of ByteString into multiple files

I'm trying to split an incoming Akka stream of bytes (from the body of an http request, but it could also be from a file) into multiple files of a defined size.

For example, if I'm uploading a 10Gb file, it would create something like 10 files of 1Gb. The files would have randomly generated names. My issue is that I don't really know where to start, because all the responses and examples I've read are either storing the whole chunk into memory, or using a delimiter based on a string. Except I can't really have "chunks" of 1Gb, and then just write them to the disk..

Is there any easy way to perform that kind of operation ? My only idea would be to use something like this http://doc.akka.io/docs/akka/2.4/scala/stream/stream-cookbook.html#Chunking_up_a_stream_of_ByteStrings_into_limited_size_ByteStrings but transformed to something like FlowShape[ByteString, File], writting myself into a file the chunks until the max file size is reached, then creating a new file, etc.., and streaming back the created files. Which looks like an atrocious idea not using properly Akka..

Thanks in advance

like image 828
Vuzi Avatar asked Jan 04 '23 20:01

Vuzi


1 Answers

I often revert to purely functional, non-akka, techniques for problems such as this and then "lift" those functions into akka constructs. By this I mean I try to use only scala "stuff" and then try to wrap that stuff inside of akka later on...

File Creation

Starting with the FileOutputStream creation based on "randomly generated names":

def randomFileNameGenerator : String = ??? //not specified in question

import java.io.FileOutputStream

val randomFileOutGenerator : () => FileOutputStream = 
  () => new FileOutputStream(randomFileNameGenerator)

State

There needs to be some way of storing the "state" of the current file, e.g. the number of bytes already written:

case class FileState(byteCount : Int = 0, 
                     fileOut : FileOutputStream = randomFileOutGenerator())

File Writing

First we determine if we'd breach the maximum file size threshold with the given ByteString:

import akka.util.ByteString

val isEndOfChunk : (FileState, ByteString, Int) => Boolean =
  (state, byteString, maxBytes) =>
    state.byteCount + byteString.length > maxBytes

We then have to write the function that creates a new FileState if we've maxed out the capacity of the current one or returns the current state if it is still below capacity:

val closeFileInState : FileState => Unit = 
  (_ : FileState).fileOut.close()

val getCurrentFileState(FileState, ByteString, Int) => FileState = 
  (state, byteString, maxBytes) =>
    if(isEndOfChunk(maxBytes, state, byteString)) {
      closeFileInState(state)
      FileState()
    }
    else
      state

The only thing left is to write to the FileOutputStream:

val writeToFileAndReturn(FileState, ByteString) => FileState = 
  (fileState, byteString) => {
    fileState.fileOut write byteString.toArray
    fileState copy (byteCount = fileState.byteCount + byteString.size)
  }

//the signature ordering will become useful
def writeToChunkedFile(maxBytes : Int)(fileState : FileState, byteString : ByteString) : FileState =    
  writeToFileAndReturn(getCurrentFileState(maxBytes, fileState, byteString), byteString)    

Fold On Any GenTraversableOnce

In scala a GenTraversableOnce is any collection, parallel or not, that has the fold operator. These include Iterator, Vector, Array, Seq, scala stream, ... Th final writeToChunkedFile function perfectly matches the signature of GenTraversableOnce#fold:

val anyIterable : Iterable = ???

val finalFileState = anyIterable.fold(FileState())(writetochunkedFile(maxBytes))

One final loose end; the last FileOutputStream needs to be closed as well. Since the fold will only emit that last FileState we can close that one:

closeFileInState(finalFileState)

Akka Streams

Akka Flow gets its fold from FlowOps#fold which happens to match the GenTraversableOnce signature. Therefore we can "lift" our regular functions into stream values similar to the way we used Iterable fold:

import akka.stream.scaladsl.Flow

def chunkerFlow(maxBytes : Int) : Flow[ByteString, FileState, _] = 
  Flow[ByteString].fold(FileState())(writeToChunkedFile(maxBytes))

The nice part about handling the problem with regular functions is that they can be used within other asynchronous frameworks beyond streams, e.g. Futures or Actors. You also don't need an akka ActorSystem in unit testing, just regular language data structures.

import akka.stream.scaladsl.Sink
import scala.concurrent.Future

def byteStringSink(maxBytes : Int) : Sink[ByteString, _] = 
  chunkerFlow(maxBytes) to (Sink foreach closeFileInState)

You can then use this Sink to drain HttpEntity coming from HttpRequest.

like image 109
Ramón J Romero y Vigil Avatar answered Jan 12 '23 23:01

Ramón J Romero y Vigil