Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

mapping a Stream with a function returning a Future

I sometimes find myself in a situation where I have some Stream[X], and a function X => Future Y, that I'd like to combine to a Future[Stream[Y]], and I can't seem to find a way to do it. For example, I have

val x = (1 until 10).toStream
def toFutureString(value : Integer) = Future(value toString)

val result : Future[Stream[String]] = ???

I tried

 val result = Future.Traverse(x, toFutureString)

which gives the correct result, but seems to consume the entire stream before returning the Future, which more or less defeats the purpse

I tried

val result = x.flatMap(toFutureString)

but that doesn't compile with type mismatch; found : scala.concurrent.Future[String] required: scala.collection.GenTraversableOnce[?]

val result = x.map(toFutureString)

returns the somewhat odd and useless Stream[Future[String]]

What should I do here to get things fixed?

Edit: I'm not stuck on a Stream, I'd be equally happy with the same operation on an Iterator, as long as it won't block on evaluating all items before starting to process the head

Edit2: I'm not 100% sure that the Future.Traverse construct needs to traverse the entire stream before returning a Future[Stream], but I think it does. If it doesn't, that's a fine answer in itself.

Edit3: I also don't need the result to be in order, I'm fine with the stream or iterator returned being whatever order.

like image 934
Martijn Avatar asked Aug 04 '13 14:08

Martijn


1 Answers

You're on the right track with traverse, but unfortunately it looks like the standard library's definition is a little broken in this case—it shouldn't need to consume the stream before returning.

Future.traverse is a specific version of a much more general function that works on any applicative functor wrapped in a "traversable" type (see these papers or my answer here for more information, for example).

The Scalaz library provides this more general version, and it works as expected in this case (note that I'm getting the applicative functor instance for Future from scalaz-contrib; it's not yet in the stable versions of Scalaz, which are still cross-built against Scala 2.9.2, which doesn't have this Future):

import scala.concurrent._
import scalaz._, Scalaz._, scalaz.contrib.std._

import ExecutionContext.Implicits.global

def toFutureString(value: Int) = Future(value.toString)

val result: Future[Stream[String]] = Stream.from(0) traverse toFutureString

This returns immediately on an infinite stream, so we know for sure that it's not being consuming first.


As a footnote: If you look at the source for Future.traverse you'll see that it's implemented in terms of foldLeft, which is convenient, but not necessary or appropriate in the case of streams.

like image 134
Travis Brown Avatar answered Oct 07 '22 02:10

Travis Brown