I am attempting to stream a file using akka streams and am running into a small issue extracting the results of the stream into a Future[String]:
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
val sink = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
source.runWith(sink)
}
I'm getting a compilation error:
Expression of type Future[Done] does not conform to expected type Future[String]
Can anyone help me understand what I'm doing wrong and what I need to do to extract the results of the stream?
If what I'm guessing is right, you want to stream the whole file content into a string. This is best achieved with a Sink.fold
, a not with a Sink.foreach
. Example below.
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
val sink = Sink.fold[String, ByteString]("") { case (acc, str) =>
acc + str.decodeString("US-ASCII")
}
source.runWith(sink)
}
You're probably aware of this, but your file will need to fit into memory for your program to run correctly.
If you look at the definition of Sink.foreach
you'll find the evaluation type is Sink[T, Future[Done]]
which means it doesn't matter what will happen with the result of the computation of the elements in the stream. Following is the definition:
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
On the other hand, the definition of Sink.fold
evaluates to a Future[U]
being U
the type of the zero
. In other words, you are able to define what will be the type of the future at the end of the processing.
The following is the definition (and implementation) for Sink.fold
:
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
According to the implementation above you can see that the type to be kept in the materialization is Future[U]
because of the Keep.right
which means something like: "I don't care if the elements coming in are T
s (or ByteString
in your case) I (the stream) will give you U
s (or String
in your case) .. when I'm done (in a Future
)"
The following is a working example of your case replacing the Sink.foreach
with Sink.fold
and evaluating the whole expression to Future[String]
def streamMigrationFile(source: Source[ByteString, _]): Future[String] = {
var fileString = ""
//def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]]
val sinkForEach: Sink[ByteString, Future[Done]] = Sink.foreach[ByteString](byteString => fileString =
fileString.concat(byteString.decodeString("US-ASCII")))
/*
def fold[U, T](zero: U)(f: (U, T) ⇒ U): Sink[T, Future[U]] =
Flow[T].fold(zero)(f).toMat(Sink.head)(Keep.right).named("foldSink")
*/
val sinkFold: Sink[ByteString, Future[String]] = Sink.fold("") { case (acc, str) =>
acc + str
}
val res1: Future[Done] = source.runWith(sinkForEach)
val res2: Future[String] = source.runWith(sinkFold)
res2
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With