I have very large Iterators that I want to split into pieces. I have a predicate that looks at an item and returns true if it is the start of a new piece. I need the pieces to be Iterators, because even the pieces will not fit into memory. There are so many pieces that I would be wary of a recursive solution blowing out your stack. The situation is similar to this question, but I need Iterators instead of Lists, and the "sentinels" (items for which the predicate is true) occur (and should be included) at the beginning of a piece. The resulting iterators will only be used in order, though some may not be used at all, and they should only use O(1) memory. I imagine this means they should all share the same underlying iterator. Performance is important.
If I were to take a stab at a function signature, it would be this:
def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] = ...
I would have loved to use takeWhile
, but it loses the last element. I investigated span
, but it buffers results. My current best idea involves BufferedIterator
, but maybe there is a better way.
You'll know you've got it right because something like this doesn't crash your JVM:
groupby((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0).foreach(group => println(group.sum))
groupby((1 to Int.MaxValue).iterator)(_ % 10 == 0).foreach(group => println(group.sum))
Iterable: A base trait for iterable collections. This is a base trait for all Scala collections that define an iterator method to step through one-by-one the collection's elements.
Unlike operations directly on a concrete collection like List , operations on Iterator are lazy.
An iterator is a way to access elements of a collection one-by-one. It resembles to a collection in terms of syntax but works differently in terms of functionality. An iterator defined for any collection does not load the entire collection into the memory but loads elements one after the other.
You have an inherent problem. Iterable
implies that you can get multiple iterators. Iterator
implies that you can only pass through once. That means that your Iterable[Iterable[T]]
should be able to produce Iterator[Iterable[T]]
s. But when this returns an element--an Iterable[T]
--and that asks for multiple iterators, the underlying single iterator can't comply without either caching the results of the list (too big) or calling the original iterable and going through absolutely everything again (very inefficient).
So, while you could do this, I think you should conceive of your problem in a different way.
If you could start with a Seq
instead, you could grab subsets as ranges.
If you already know how you want to use your iterable, you could write a method
def process[T](source: Iterable[T])(starts: T => Boolean)(handlers: T => Unit *)
which increments through the set of handlers each time starts
fires off a "true". If there's any way you can do your processing in one sweep, something like this is the way to go. (Your handlers will have to save state via mutable data structures or variables, however.)
If you can permit iteration on the outer list to break the inner list, you could have an Iterable[Iterator[T]]
with the additional constraint that once you iterate to a later sub-iterator, all previous sub-iterators are invalid.
Here's a solution of the last type (from Iterator[T]
to Iterator[Iterator[T]]
; one can wrap this to make the outer layers Iterable
instead).
class GroupedBy[T](source: Iterator[T])(starts: T => Boolean)
extends Iterator[Iterator[T]] {
private val underlying = source
private var saved: T = _
private var cached = false
private var starting = false
private def cacheNext() {
saved = underlying.next
starting = starts(saved)
cached = true
}
private def oops() { throw new java.util.NoSuchElementException("empty iterator") }
// Comment the next line if you do NOT want the first element to always start a group
if (underlying.hasNext) { cacheNext(); starting = true }
def hasNext = {
while (!(cached && starting) && underlying.hasNext) cacheNext()
cached && starting
}
def next = {
if (!(cached && starting) && !hasNext) oops()
starting = false
new Iterator[T] {
var presumablyMore = true
def hasNext = {
if (!cached && !starting && underlying.hasNext && presumablyMore) cacheNext()
presumablyMore = cached && !starting
presumablyMore
}
def next = {
if (presumablyMore && (cached || hasNext)) {
cached = false
saved
}
else oops()
}
}
}
}
Here's my solution using BufferedIterator
. It doesn't let you skip iterators correctly, but it's fairly simple and functional. The first element(s) go into a group even if !startsGroup(first)
.
def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
val base = iter.buffered
override def hasNext = base.hasNext
override def next() = Iterator(base.next()) ++ new Iterator[T] {
override def hasNext = base.hasNext && !startsGroup(base.head)
override def next() = if (hasNext) base.next() else Iterator.empty.next()
}
}
Update: Keeping a little state lets you skip iterators and prevent people from messing with previous ones:
def groupby[T](iter: Iterator[T])(startsGroup: T => Boolean): Iterator[Iterator[T]] =
new Iterator[Iterator[T]] {
val base = iter.buffered
var prev: Iterator[T] = Iterator.empty
override def hasNext = base.hasNext
override def next() = {
while (prev.hasNext) prev.next() // Exhaust previous iterator; take* and drop* do NOT always work!! (Jira SI-5002?)
prev = Iterator(base.next()) ++ new Iterator[T] {
var hasMore = true
override def hasNext = { hasMore = hasMore && base.hasNext && !startsGroup(base.head) ; hasMore }
override def next() = if (hasNext) base.next() else Iterator.empty.next()
}
prev
}
}
If you are looking at memory constraints then the following will work. You can only use it if your underlying iterable object supports views. This implementation will iterate over the Iterable and then generate IterableViews which can then be iterated over. This implementation does not care if the very first element tests as a start group since it will be regardless.
def groupby[T](iter: Iterable[T])(startsGroup: T => Boolean): Iterable[Iterable[T]] = new Iterable[Iterable[T]] {
def iterator = new Iterator[Iterable[T]] {
val i = iter.iterator
var index = 0
var nextView: IterableView[T, Iterable[T]] = getNextView()
private def getNextView() = {
val start = index
var hitStartGroup = false
while ( i.hasNext && ! hitStartGroup ) {
val next = i.next()
index += 1
hitStartGroup = ( index > 1 && startsGroup( next ) )
}
if ( hitStartGroup ) {
if ( start == 0 ) iter.view( start, index - 1 )
else iter.view( start - 1, index - 1 )
} else { // hit end
if ( start == index ) null
else if ( start == 0 ) iter.view( start, index )
else iter.view( start - 1, index )
}
}
def hasNext = nextView != null
def next() = {
if ( nextView != null ) {
val next = nextView
nextView = getNextView()
next
} else null
}
}
}
You can maintain low memory foot-print by using Streams. Use result.toIterator, if you an iterator again.
With streams, there's no mutable state, only a single conditional and it's nearly as concise as Jay Hacker's solution.
def batchBy[A,B](iter: Iterator[A])(f: A => B): Stream[(B, Iterator[A])] = {
val base = iter.buffered
val empty = Stream.empty[(B, Iterator[A])]
def getBatch(key: B) = {
Iterator(base.next()) ++ new Iterator[A] {
def hasNext: Boolean = base.hasNext && (f(base.head) == key)
def next(): A = base.next()
}
}
def next(skipList: Option[Iterator[A]] = None): Stream[(B, Iterator[A])] = {
skipList.foreach{_.foreach{_=>}}
if (base.isEmpty) empty
else {
val key = f(base.head)
val batch = getBatch(key)
Stream.cons((key, batch), next(Some(batch)))
}
}
next()
}
I ran the tests:
scala> batchBy((1 to Int.MaxValue).iterator)(_ % (Int.MaxValue / 2) == 0)
.foreach{case(_,group) => println(group.sum)}
-1610612735
1073741823
-536870909
2147483646
2147483647
The second test prints too much to paste to Stack Overflow.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With