Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala, Erastothenes: Is there a straightforward way to replace a stream with an iteration?

I wrote a function that generates primes indefinitely (wikipedia: incremental sieve of Erastothenes) usings streams. It returns a stream, but it also merges streams of prime multiples internally to mark upcoming composites. The definition is concise, functional, elegant and easy to understand, if I do say so myself:

def primes(): Stream[Int] = {
  def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
    def next = a.head min b.head
    Stream.cons(next, merge(if (a.head == next) a.tail else a,
                            if (b.head == next) b.tail else b))
  }
  def test(n: Int, compositeStream: Stream[Int]): Stream[Int] = {
    if (n == compositeStream.head) test(n+1, compositeStream.tail)
    else Stream.cons(n, test(n+1, merge(compositeStream, Stream.from(n*n, n))))
  }
  test(2, Stream.from(4, 2))
}

But, I get a "java.lang.OutOfMemoryError: GC overhead limit exceeded" when I try to generate the 1000th prime.

I have an alternative solution that returns an iterator over primes and uses a priority queue of tuples (multiple, prime used to generate multiple) internally to mark upcoming composites. It works well, but it takes about twice as much code, and I basically had to restart from scratch:

import scala.collection.mutable.PriorityQueue
def primes(): Iterator[Int] = {
  // Tuple (composite, prime) is used to generate a primes multiples
  object CompositeGeneratorOrdering extends Ordering[(Long, Int)] {
    def compare(a: (Long, Int), b: (Long, Int)) = b._1 compare a._1
  }
  var n = 2;
  val composites = PriorityQueue(((n*n).toLong, n))(CompositeGeneratorOrdering)
  def advance = {
    while (n == composites.head._1) { // n is composite
      while (n == composites.head._1) { // duplicate composites
        val (multiple, prime) = composites.dequeue
        composites.enqueue((multiple + prime, prime))
      }
      n += 1
    }
    assert(n < composites.head._1)
    val prime = n
    n += 1
    composites.enqueue((prime.toLong * prime.toLong, prime))
    prime
  }
  Iterator.continually(advance)
}

Is there a straightforward way to translate the code with streams to code with iterators? Or is there a simple way to make my first attempt more memory efficient?

It's easier to think in terms of streams; I'd rather start that way, then tweak my code if necessary.

like image 389
stewSquared Avatar asked Jan 08 '14 01:01

stewSquared


3 Answers

I guess it's a bug in current Stream implementation.

primes().drop(999).head works fine:

primes().drop(999).head
// Int = 7919

You'll get OutOfMemoryError with stored Stream like this:

val prs = primes()

prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

The problem here with class Cons implementation: it contains not only calculated tail, but also a function to calculate this tail. Even when the tail is calculated and function is not needed any more!

In this case functions are extremely heavy, so you'll get OutOfMemoryError even with 1000 functions stored.

We have to drop that functions somehow.

Intuitive fix is failed:

val prs = primes().iterator.toStream

prs.drop(999).head
// Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit exceeded

With iterator on Stream you'll get StreamIterator, with StreamIterator#toStream you'll get initial heavy Stream.

Workaround

So we have to convert it manually:

def toNewStream[T](i: Iterator[T]): Stream[T] =
  if (i.hasNext) Stream.cons(i.next, toNewStream(i))
  else Stream.empty

val prs = toNewStream(primes().iterator)
// Stream[Int] = Stream(2, ?)

prs.drop(999).head
// Int = 7919
like image 146
senia Avatar answered Nov 09 '22 22:11

senia


In your first code, you should postpone the merging until the square of a prime is seen amongst the candidates. This will drastically reduce the number of streams in use, radically improving your memory usage issues. To get the 1000th prime, 7919, we only need to consider primes not above its square root, 88. That's just 23 primes/streams of their multiples, instead of 999 (22, if we ignore the evens from the outset). For the 10,000th prime, it's the difference between having 9999 streams of multiples and just 66. And for the 100,000th, only 189 are needed.

The trick is to separate the primes being consumed from the primes being produced, via a recursive invocation:

def primes(): Stream[Int] = {
  def merge(a: Stream[Int], b: Stream[Int]): Stream[Int] = {
    def next = a.head min b.head
    Stream.cons(next, merge(if (a.head == next) a.tail else a,
                            if (b.head == next) b.tail else b))
  }
  def test(n: Int, q: Int, 
                   compositeStream: Stream[Int], 
                   primesStream: Stream[Int]): Stream[Int] = {
    if (n == q) test(n+2, primesStream.tail.head*primesStream.tail.head,
                          merge(compositeStream, 
                                Stream.from(q, 2*primesStream.head).tail),
                          primesStream.tail)
    else if (n == compositeStream.head) test(n+2, q, compositeStream.tail,
                                                     primesStream)
    else Stream.cons(n, test(n+2, q, compositeStream, primesStream))
  }
  Stream.cons(2, Stream.cons(3, Stream.cons(5, 
     test(7, 25, Stream.from(9, 6), primes().tail.tail))))
}

As an added bonus, there's no need to store the squares of primes as Longs. This will also be much faster and have better algorithmic complexity (time and space) as it avoids doing a lot of superfluous work. Ideone testing shows it runs at about ~ n1.5..1.6empirical orders of growth in producing up to n = 80,000 primes.

There's still an algorithmic problem here: the structure that is created here is still a linear left-leaning structure (((mults_of_2 + mults_of_3) + mults_of_5) + ...), with more frequently-producing streams situated deeper inside it (so the numbers have more levels to percolate through, going up). The right-leaning structure should be better, mults_of_2 + (mults_of_3 + (mults_of_5 + ...)). Making it a tree should bring a real improvement in time complexity (pushing it down typically to about ~ n1.2..1.25). For a related discussion, see this haskellwiki page.

The "real" imperative sieve of Eratosthenes usually runs at around ~ n1.1 (in n primes produced) and an optimal trial division sieve at ~ n1.40..1.45. Your original code runs at about cubic time, or worse. Using imperative mutable array is usually the fastest, working by segments (a.k.a. the segmented sieve of Eratosthenes).

In the context of your second code, this is how it is achieved in Python.

like image 36
Will Ness Avatar answered Nov 10 '22 00:11

Will Ness


Is there a straightforward way to translate the code with streams to code with iterators? Or is there a simple way to make my first attempt more memory efficient?

@Will Ness has given you an improved answer using Streams and given reasons why your code is taking so much memory and time as in adding streams early and a left-leaning linear structure, but no one has completely answered the second (or perhaps main) part of your question as to can a true incremental Sieve of Eratosthenes be implemented with Iterator's.

First, we should properly credit this right-leaning algorithm of which your first code is a crude (left-leaning) example (since it prematurely adds all prime composite streams to the merge operations), which is due to Richard Bird as in the Epilogue of Melissa E. O'Neill's definitive paper on incremental Sieve's of Eratosthenes.

Second, no, it isn't really possible to substitute Iterator's for Stream's in this algorithm as it depends on moving through a stream without restarting the stream, and although one can access the head of an iterator (the current position), using the next value (skipping over the head) to generate the rest of the iteration as a stream requires building a completely new iterator at a terrible cost in memory and time. However, we can use an Iterator to output the results of the sequence of primes in order to minimize memory use and make it easy to use iterator higher order functions, as you will see in my code below.

Now Will Ness has walked you though the principles of postponing adding prime composite streams to the calculations until they are needed, which works well when one is storing these in a structure such as a Priority Queue or a HashMap and was even missed in the O'Neill paper, but for the Richard Bird algorithm this is not necessary as future stream values will not be accessed until needed so are not stored if the Streams are being properly lazily built (as is lazily and left-leaning). In fact, this algorithm doesn't even need the memorization and overheads of a full Stream as each composite number culling sequence only moves forward without reference to any past primes other than one needs a separate source of the base primes, which can be supplied by a recursive call of the same algorithm.

For ready reference, let's list the Haskell code of the Richard Bird algorithms as follows:

primes = 2:([3..] ‘minus‘ composites)
  where
    composites = union [multiples p | p <− primes]
    multiples n = map (n*) [n..]
    (x:xs) ‘minus‘ (y:ys)
      | x < y = x:(xs ‘minus‘ (y:ys))
      | x == y = xs ‘minus‘ ys
      | x > y = (x:xs) ‘minus‘ ys
    union = foldr merge []
      where
        merge (x:xs) ys = x:merge’ xs ys
        merge’ (x:xs) (y:ys)
          | x < y = x:merge’ xs (y:ys)
          | x == y = x:merge’ xs ys
          | x > y = y:merge’ (x:xs) ys

In the following code I have simplified the 'minus' function (called "minusStrtAt") as we don't need to build a completely new stream but can incorporate the composite subtraction operation with the generation of the original (in my case odds only) sequence. I have also simplified the "union" function (renaming it as "mrgMltpls")

The stream operations are implemented as a non memoizing generic Co Inductive Stream (CIS) as a generic class where the first field of the class is the value of the current position of the stream and the second is a thunk (a zero argument function that returns the next value of the stream through embedded closure arguments to another function).

def primes(): Iterator[Long] = {
  // generic class as a Co Inductive Stream element
  class CIS[A](val v: A, val cont: () => CIS[A])

  def mltpls(p: Long): CIS[Long] = {
    var px2 = p * 2
    def nxtmltpl(cmpst: Long): CIS[Long] =
      new CIS(cmpst, () => nxtmltpl(cmpst + px2))
    nxtmltpl(p * p)
  }
  def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
    new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
  def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
    if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
    else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
    else new CIS(b.v, () => merge(a.cont(), b.cont()))
  def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
    new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(mlps.cont())))
  def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
    if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
    else minusStrtAt(n + 2, cmpsts.cont())
  // the following are recursive, where cmpsts uses oddPrms and
  // oddPrms uses a delayed version of cmpsts in order to avoid a race
  // as oddPrms will already have a first value when cmpsts is called to generate the second
  def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
  def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
  Iterator.iterate(new CIS(2L, () => oddPrms()))
                   {(cis: CIS[Long]) => cis.cont()}
    .map {(cis: CIS[Long]) => cis.v}
}

The above code generates the 100,000th prime (1299709) on ideone in about 1.3 seconds with about a 0.36 second overhead and has an empirical computational complexity to 600,000 primes of about 1.43. The memory use is negligible above that used by the program code.

The above code could be implemented using the built-in Scala Streams, but there is a performance and memory use overhead (of a constant factor) that this algorithm does not require. Using Streams would mean that one could use them directly without the extra Iterator generation code, but as this is used only for final output of the sequence, it doesn't cost much.

To implement some basic tree folding as Will Ness has suggested, one only needs to add a "pairs" function and hook it into the "mrgMltpls" function:

def primes(): Iterator[Long] = {
  // generic class as a Co Inductive Stream element
  class CIS[A](val v: A, val cont: () => CIS[A])

  def mltpls(p: Long): CIS[Long] = {
    var px2 = p * 2
    def nxtmltpl(cmpst: Long): CIS[Long] =
      new CIS(cmpst, () => nxtmltpl(cmpst + px2))
    nxtmltpl(p * p)
  }
  def allMltpls(mps: CIS[Long]): CIS[CIS[Long]] =
    new CIS(mltpls(mps.v), () => allMltpls(mps.cont()))
  def merge(a: CIS[Long], b: CIS[Long]): CIS[Long] =
    if (a.v < b.v) new CIS(a.v, () => merge(a.cont(), b))
    else if (a.v > b.v) new CIS(b.v, () => merge(a, b.cont()))
    else new CIS(b.v, () => merge(a.cont(), b.cont()))
  def pairs(mltplss: CIS[CIS[Long]]): CIS[CIS[Long]] = {
    val tl = mltplss.cont()
    new CIS(merge(mltplss.v, tl.v), () => pairs(tl.cont()))
  }
  def mrgMltpls(mlps: CIS[CIS[Long]]): CIS[Long] =
    new CIS(mlps.v.v, () => merge(mlps.v.cont(), mrgMltpls(pairs(mlps.cont()))))
  def minusStrtAt(n: Long, cmpsts: CIS[Long]): CIS[Long] =
    if (n < cmpsts.v) new CIS(n, () => minusStrtAt(n + 2, cmpsts))
    else minusStrtAt(n + 2, cmpsts.cont())
  // the following are recursive, where cmpsts uses oddPrms and
  // oddPrms uses a delayed version of cmpsts in order to avoid a race
  // as oddPrms will already have a first value when cmpsts is called to generate the second
  def cmpsts(): CIS[Long] = mrgMltpls(allMltpls(oddPrms()))
  def oddPrms(): CIS[Long] = new CIS(3, () => minusStrtAt(5L, cmpsts()))
  Iterator.iterate(new CIS(2L, () => oddPrms()))
                   {(cis: CIS[Long]) => cis.cont()}
    .map {(cis: CIS[Long]) => cis.v}
}

The above code generates the 100,000th prime (1299709) on ideone in about 0.75 seconds with about a 0.37 second overhead and has an empirical computational complexity to the 1,000,000th prime (15485863) of about 1.09 (5.13 seconds). The memory use is negligible above that used by the program code.

Note that the above codes are completely functional in that there is no mutable state used whatsoever, but that the Bird algorithm (or even the tree folding version) isn't as fast as using a Priority Queue or HashMap for larger ranges as the number of operations to handle the tree merging has a higher computational complexity than the log n overhead of the Priority Queue or the linear (amortized) performance of a HashMap (although there is a large constant factor overhead to handle the hashing so that advantage isn't really seen until some truly large ranges are used).

The reason that these codes use so little memory is that the CIS streams are formulated with no permanent reference to the start of the streams so that the streams are garbage collected as they are used, leaving only the minimal number of base prime composite sequence place holders, which as Will Ness has explained is very small - only 546 base prime composite number streams for generating the first million primes up to 15485863, each placeholder only taking a few 10's of bytes (eight for the Long number, eight for the 64-bit function reference, with another couple of eight bytes for the pointer to the closure arguments and another few bytes for function and class overheads, for a total per stream placeholder of perhaps 40 bytes, or a total of not much more than 20 Kilobytes for generating the sequence for a million primes).

like image 39
GordonBGood Avatar answered Nov 09 '22 23:11

GordonBGood