I am able to write a Stream-processing function, drop(n,s), that scales to very large streams. But when I write another function, nth(n,s), that takes a stream s and forwards it to drop(n,s), it seems that nth() is "holding on to the head" of the stream. This results in an OutOfMemoryError for large n.
Here's the code:
import scala.annotation.tailrec
import scala.collection.immutable.Stream.cons
object Streams {
def iterate[A](start: A, f: A => A): Stream[A] =
cons(start, iterate(f(start), f))
def inc(n:Int) = n + 1
def naturals() =
iterate(1, inc)
@tailrec def drop[T](n : Int, s : Stream[T]) : Stream[T] =
if (n <= 0 || s.isEmpty) s
else drop(n-1, s.tail)
// @inline didn't seem to help
def nth[T](n : Int, s : Stream[T]) =
drop(n,s).head
def N = 1e7.toInt
def main(args: Array[String]) {
println(drop(N,naturals()).head) // works fine for large N
println(nth(N, naturals())) // results in OutOfMemoryError for N set to 1e7.toInt and -Xmx10m
}
}
My experience over on this Java question: why does this Java method leak—and why does inlining it fix the leak? leads me to believe that the Scala-generated code for nth() is not being aggressive enough in clearing s before calling drop(). The Clojure library trick (Java trick) (see that linked question) doesn't work here because all Scala function parameters are vals (not vars) so they can't be assigned (null).
How can I write a scalable nth() in terms of drop()?
Here is a related Scala compiler bug from 2009-2011 (reduceLeft() implemented in terms of foldLeft()): https://issues.scala-lang.org/browse/SI-2239
I can't tell from that Scala bug ticket, how they fixed it. There was a suggestion in the ticket that the only way to fix it was to duplicate the foldLeft() code in reduceLeft(). I really hope that isn't the answer.
Update: Andrey Tyukin's answer https://stackoverflow.com/a/52209383/156550 fixes it. Now I have:
// have to call by name (s) here, otherwise we hold on to head!
def nth[T](n : Int, s : => Stream[T]) =
drop(n,s).head
And nth(n,s) scales fine.
Here is a quick-&-dirty solution that takes just two extra characters:
def nth[T](n : Int, s: => Stream[T]) = drop(n,s).head
Here is what happens without the =>:
s is passed as argument to nth, it is a reference to an already existing value, previously created by naturals(). .head, the drop(n, s) has to return the stream to the stack frame of nth, therefore the stackframe of nth cannot be discarded, and so nth holds on to the reference s.s is kept by the stack frame of nth while drop is working, all the dropped prefixes cannot actually be garbage-collected (this is because Stream guarantees that if you hold on to its head and iterate through it multiple times, it will return the same results).Now, if you add =>, then the following happens:
nth still cannot be discarded because of .headnth does not hold the reference to the head of the stream passed to drop. It only holds a reference to a thunk that produces the Stream.Additional note (Dima's test case):
Note that if the thunk itself simply returns a reference to an already existing Stream, then the behavior is still the same as without =>. For example, if your inc were defined as follows:
def inc(i: Int): Int = {
println(System.currentTimeMillis())
i + 1
}
then invoking
val s = naturals()
nth(10, s)
nth(5, s)
would print the current time only ten times (not 15).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With