Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Avoiding memory leaks with Scalaz 7 zipWithIndex/group enumeratees

Background

As noted in this question, I'm using Scalaz 7 iteratees to process a large (i.e., unbounded) stream of data in constant heap space.

My code looks like this:

type ErrorOrT[M[+_], A] = EitherT[M, Throwable, A] type ErrorOr[A] = ErrorOrT[IO, A]  def processChunk(c: Chunk, idx: Long): Result  def process(data: EnumeratorT[Chunk, ErrorOr]): IterateeT[Vector[(Chunk, Long)], ErrorOr, Vector[Result]] =   Iteratee.fold[Vector[(Chunk, Long)], ErrorOr, Vector[Result]](Nil) { (rs, vs) =>     rs ++ vs map {        case (c, i) => processChunk(c, i)      }   } &= (data.zipWithIndex mapE Iteratee.group(P)) 

The Problem

I seem to have run into a memory leak, but I'm not familiar enough with Scalaz/FP to know whether the bug is in Scalaz or in my code. Intuitively, I expect this code to require only (on the order of) P times the Chunk-size space.

Note: I found a similar question in which an OutOfMemoryError was encountered, but my code is not using consume.

Testing

I ran some tests to try and isolate the problem. To summarize, the leak only appears to arise when both zipWithIndex and group are used.

// no zipping/grouping scala> (i1 &= enumArrs(1 << 25, 128)).run.unsafePerformIO res47: Long = 4294967296  // grouping only scala> (i2 &= (enumArrs(1 << 25, 128) mapE Iteratee.group(4))).run.unsafePerformIO res49: Long = 4294967296  // zipping and grouping scala> (i3 &= (enumArrs(1 << 25, 128).zipWithIndex mapE Iteratee.group(4))).run.unsafePerformIO java.lang.OutOfMemoryError: Java heap space  // zipping only scala> (i4 &= (enumArrs(1 << 25, 128).zipWithIndex)).run.unsafePerformIO res51: Long = 4294967296  // no zipping/grouping, larger arrays scala> (i1 &= enumArrs(1 << 27, 128)).run.unsafePerformIO res53: Long = 17179869184  // zipping only, larger arrays scala> (i4 &= (enumArrs(1 << 27, 128).zipWithIndex)).run.unsafePerformIO res54: Long = 17179869184 

Code for the tests:

import scalaz.iteratee._, scalaz.effect.IO, scalaz.std.vector._  // define an enumerator that produces a stream of new, zero-filled arrays def enumArrs(sz: Int, n: Int) =    Iteratee.enumIterator[Array[Int], IO](     Iterator.continually(Array.fill(sz)(0)).take(n))  // define an iteratee that consumes a stream of arrays  // and computes its length val i1 = Iteratee.fold[Array[Int], IO, Long](0) {    (c, a) => c + a.length  }  // define an iteratee that consumes a grouped stream of arrays  // and computes its length val i2 = Iteratee.fold[Vector[Array[Int]], IO, Long](0) {    (c, as) => c + as.map(_.length).sum  }  // define an iteratee that consumes a grouped/zipped stream of arrays // and computes its length val i3 = Iteratee.fold[Vector[(Array[Int], Long)], IO, Long](0) {   (c, vs) => c + vs.map(_._1.length).sum }  // define an iteratee that consumes a zipped stream of arrays // and computes its length val i4 = Iteratee.fold[(Array[Int], Long), IO, Long](0) {   (c, v) => c + v._1.length } 

Questions

  • Is the bug in my code?
  • How can I make this work in constant heap space?
like image 243
Aaron Novstrup Avatar asked Oct 02 '13 01:10

Aaron Novstrup


1 Answers

This will come as little consolation for anyone who's stuck with the older iteratee API, but I recently verified that an equivalent test passes against the scalaz-stream API. This is a newer stream processing API that is intended to replace iteratee.

For completeness, here's the test code:

// create a stream containing `n` arrays with `sz` Ints in each one def streamArrs(sz: Int, n: Int): Process[Task, Array[Int]] =   (Process emit Array.fill(sz)(0)).repeat take n  (streamArrs(1 << 25, 1 << 14).zipWithIndex        pipe process1.chunk(4)        pipe process1.fold(0L) {     (c, vs) => c + vs.map(_._1.length.toLong).sum   }).runLast.run 

This should work with any value for the n parameter (provided you're willing to wait long enough) -- I tested with 2^14 32MiB arrays (i.e., a total of half a TiB of memory allocated over time).

like image 138
Aaron Novstrup Avatar answered Dec 05 '22 19:12

Aaron Novstrup