Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why Source.fromIterator expects a Function0[Iterator[T]] as a parameter instead of Iterator[T]?

Based on: source code

I don't get why the parameter of Source.fromIterator is Function0[Iterator[T]] instead of Iterator[T].

Is there a pratical reason for this? Could we change the signature to def fromIterator(iterator: => Iterator[T]) instead ? (to avoid doing Source.fromIterator( () => myIterator) )

like image 315
john Avatar asked Nov 18 '16 17:11

john


1 Answers

As per the docs:

The iterator will be created anew for each materialization, which is the reason the method takes a function rather than an iterator directly.

Stream stages are supposed to be re-usable so you can materialize them more than one. A given iterator, however, can (often) be consumed one time only. If fromIterator created a Source that referred to an existing iterator (whether passed by name or reference) a second attempt to materialize it could fail because the underlying iterator would be exhausted.

To get around this, the source needs to be able to instantiate a new iterator, so fromIterator allows you to supply the necessary logic to do this as a supplier function.

Here's an example of something we don't want to happen:

implicit val system = akka.actor.ActorSystem.create("test")
implicit val mat = akka.stream.ActorMaterializer(system)

val iter = Iterator.range(0, 2)
// pretend we pass the iterator directly...
val src = Source.fromIterator(() => iter)

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res0: akka.Done = Done

Await.result(src.runForEach(println), 2.seconds)
// res1: akka.Done = Done
// No results???

That's bad because the Source src is not re-usable since it doesn't give the same output on subsequent runs. However if we create the iterator lazily it works:

val iterFunc = () => Iterator.range(0, 2)
val src = Source.fromIterator(iterFunc)

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res0: akka.Done = Done

Await.result(src.runForEach(println), 2.seconds)
// 0
// 1
// res1: akka.Done = Done
like image 66
Mikesname Avatar answered Oct 29 '22 12:10

Mikesname