Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I extract a Future[String] from an akka Source[ByteString, _]?

I am attempting to stream a file using akka streams and am running into a small issue extracting the results of the stream into a Future[String]:

def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
  var fileString = ""
  val sink = Sink.foreach[ByteString](byteString => fileString = 
    fileString.concat(byteString.decodeString("US-ASCII")))
  source.runWith(sink)
}

I'm getting a compilation error:

Expression of type Future[Done] does not conform to expected type Future[String]

Can anyone help me understand what I'm doing wrong and what I need to do to extract the results of the stream?

like image 765
Reece Long Avatar asked Jun 13 '17 19:06

Reece Long


2 Answers

If what I'm guessing is right, you want to stream the whole file content into a string. This is best achieved with a Sink.fold, a not with a Sink.foreach. Example below.

def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
  val sink = Sink.fold[String, ByteString]("") { case (acc, str) =>
    acc + str.decodeString("US-ASCII")
  }
  source.runWith(sink)
}

You're probably aware of this, but your file will need to fit into memory for your program to run correctly.

like image 177
Stefano Bonetti Avatar answered Nov 18 '22 02:11

Stefano Bonetti


If you look at the definition of Sink.foreach you'll find the evaluation type is Sink[T, Future[Done]] which means it doesn't matter what will happen with the result of the computation of the elements in the stream. Following is the definition:

def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]

On the other hand, the definition of Sink.fold evaluates to a Future[U] being U the type of the zero. In other words, you are able to define what will be the type of the future at the end of the processing.

The following is the definition (and implementation) for Sink.fold:

def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
          Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")

According to the implementation above you can see that the type to be kept in the materialization is Future[U] because of the Keep.right which means something like: "I don't care if the elements coming in are Ts (or ByteString in your case) I (the stream) will give you Us (or String in your case) .. when I'm done (in a Future)"

The following is a working example of your case replacing the Sink.foreach with Sink.fold and evaluating the whole expression to Future[String]

def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {

  var fileString = ""

  //def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
  val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
    fileString.concat(byteString.decodeString("US-ASCII")))

  /*
    def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
      Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
   */
  val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
    acc + str
  }

  val res1: Future[Done] = source.runWith(sinkForEach)
  val res2: Future[String] = source.runWith(sinkFold)

  res2

}
like image 42
juanpavergara Avatar answered Nov 18 '22 04:11

juanpavergara