Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala: Group an Iterable into an Iterable of Iterables by a predicate

Tags:

iterator

scala

I have very large Iterators that I want to split into pieces. I have a predicate that looks at an item and returns true if it is the start of a new piece. I need the pieces to be Iterators, because even the pieces will not fit into memory. There are so many pieces that I would be wary of a recursive solution blowing out your stack. The situation is similar to this question, but I need Iterators instead of Lists, and the "sentinels" (items for which the predicate is true) occur (and should be included) at the beginning of a piece. The resulting iterators will only be used in order, though some may not be used at all, and they should only use O(1) memory. I imagine this means they should all share the same underlying iterator. Performance is important.

If I were to take a stab at a function signature, it would be this:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ...

I would have loved to use takeWhile, but it loses the last element. I investigated span, but it buffers results. My current best idea involves BufferedIterator, but maybe there is a better way.

You'll know you've got it right because something like this doesn't crash your JVM:

groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0).foreach(group => println(group.sum))
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum))
like image 569
Jay Hacker Avatar asked Nov 22 '11 18:11

Jay Hacker


People also ask

What is Iterable type in Scala?

Iterable: A base trait for iterable collections. This is a base trait for all Scala collections that define an iterator method to step through one-by-one the collection's elements.

Is Scala iterator lazy?

Unlike operations directly on a concrete collection like List , operations on Iterator are lazy.

What does iterator do in Scala?

An iterator is a way to access elements of a collection one-by-one. It resembles to a collection in terms of syntax but works differently in terms of functionality. An iterator defined for any collection does not load the entire collection into the memory but loads elements one after the other.


4 Answers

You have an inherent problem. Iterable implies that you can get multiple iterators. Iterator implies that you can only pass through once. That means that your Iterable[Iterable[T]] should be able to produce Iterator[Iterable[T]]s. But when this returns an element--an Iterable[T]--and that asks for multiple iterators, the underlying single iterator can't comply without either caching the results of the list (too big) or calling the original iterable and going through absolutely everything again (very inefficient).

So, while you could do this, I think you should conceive of your problem in a different way.

If you could start with a Seq instead, you could grab subsets as ranges.

If you already know how you want to use your iterable, you could write a method

def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *)

which increments through the set of handlers each time starts fires off a "true". If there's any way you can do your processing in one sweep, something like this is the way to go. (Your handlers will have to save state via mutable data structures or variables, however.)

If you can permit iteration on the outer list to break the inner list, you could have an Iterable[Iterator[T]] with the additional constraint that once you iterate to a later sub-iterator, all previous sub-iterators are invalid.


Here's a solution of the last type (from Iterator[T] to Iterator[Iterator[T]]; one can wrap this to make the outer layers Iterable instead).

class GroupedBy[T](source: Iterator[T])(starts: T => Boolean)
extends Iterator[Iterator[T]] {
  private val underlying = source
  private var saved: T = _
  private var cached = false
  private var starting = false
  private def cacheNext() {
    saved = underlying.next
    starting = starts(saved)
    cached = true
  }
  private def oops() { throw new java.util.NoSuchElementException("empty iterator") }
  // Comment the next line if you do NOT want the first element to always start a group
  if (underlying.hasNext) { cacheNext(); starting = true }
  def hasNext = {
    while (!(cached && starting) && underlying.hasNext) cacheNext()
    cached && starting
  }
  def next = {
    if (!(cached && starting) && !hasNext) oops()
    starting = false
    new Iterator[T] {
      var presumablyMore = true
      def hasNext = {
        if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext()
        presumablyMore = cached && !starting
        presumablyMore
      }
      def next = {
        if (presumablyMore && (cached || hasNext)) { 
          cached = false
          saved
        }
        else oops()
      }
    }
  }
}
like image 77
Rex Kerr Avatar answered Oct 20 '22 23:10

Rex Kerr


Here's my solution using BufferedIterator. It doesn't let you skip iterators correctly, but it's fairly simple and functional. The first element(s) go into a group even if !startsGroup(first).

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
  new Iterator[Iterator[T]] {
    val base = iter.buffered
    override def hasNext = base.hasNext  
    override def next() = Iterator(base.next()) ++ new Iterator[T] {
      override def hasNext = base.hasNext && !startsGroup(base.head) 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
  }

Update: Keeping a little state lets you skip iterators and prevent people from messing with previous ones:

def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
  val base = iter.buffered
  var prev: Iterator[T] = Iterator.empty
  override def hasNext = base.hasNext  
  override def next() = {
    while (prev.hasNext) prev.next()        // Exhaust previous iterator; take* and drop* do NOT always work!!  (Jira SI-5002?)
    prev = Iterator(base.next()) ++ new Iterator[T] {
      var hasMore = true
      override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore } 
      override def next() = if (hasNext) base.next() else Iterator.empty.next()
    }
    prev
  }
}
like image 44
Jay Hacker Avatar answered Oct 21 '22 01:10

Jay Hacker


If you are looking at memory constraints then the following will work. You can only use it if your underlying iterable object supports views. This implementation will iterate over the Iterable and then generate IterableViews which can then be iterated over. This implementation does not care if the very first element tests as a start group since it will be regardless.

def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] {
  def iterator = new Iterator[Iterable[T]] {
    val i = iter.iterator
    var index = 0
    var nextView: IterableView[T, Iterable[T]] = getNextView()
    private def getNextView() = {
      val start = index
      var hitStartGroup = false
      while ( i.hasNext && ! hitStartGroup ) {
        val next = i.next()
        index += 1
        hitStartGroup = ( index > 1 && startsGroup( next ) )
      }
      if ( hitStartGroup ) {
        if ( start == 0 ) iter.view( start, index - 1 )
        else iter.view( start - 1, index - 1 )
      } else { // hit end
        if ( start == index ) null
        else if ( start == 0 ) iter.view( start, index )
        else iter.view( start - 1, index )
      }
    }
    def hasNext = nextView != null
    def next() = {
      if ( nextView != null ) {
        val next = nextView
        nextView = getNextView()
        next
      } else null
    }
  }
}
like image 42
Neil Essy Avatar answered Oct 21 '22 01:10

Neil Essy


You can maintain low memory foot-print by using Streams. Use result.toIterator, if you an iterator again.

With streams, there's no mutable state, only a single conditional and it's nearly as concise as Jay Hacker's solution.

 def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = {
    val base = iter.buffered
    val empty = Stream.empty[(B,  Iterator[A])]

    def getBatch(key: B) = {
      Iterator(base.next()) ++ new Iterator[A] {
        def hasNext: Boolean = base.hasNext && (f(base.head) == key)
        def next(): A = base.next()
      }
    }

    def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = {
      skipList.foreach{_.foreach{_=>}}

      if (base.isEmpty) empty
      else {
        val key = f(base.head)
        val batch = getBatch(key)

        Stream.cons((key, batch), next(Some(batch)))
      }
    }

    next()
  }

I ran the tests:

scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0)
         .foreach{case(_,group) => println(group.sum)}
-1610612735
1073741823
-536870909
2147483646
2147483647

The second test prints too much to paste to Stack Overflow.

like image 2
agarman Avatar answered Oct 21 '22 01:10

agarman