Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to copy iterator in Scala?

Tags:

iterator

scala

About duplicate

This is NOT a duplicate of How to clone an iterator?

Please do not blindly close this question, all the answers given in so-called duplicate DO NOT work. The OP is in charge of the other problem, and obviously, the answers fitted HIS problem, but not mine.

Not every similar question is a duplicate, there is such feature as "expansion question" on SE, the only way is to ask again on the same subject to get different, working, answers.

Problem

I have iterator. I would like to get copy (duplicate) of it, so then I could proceed with original and copy completely independently.

Important

Copying through reflection or serialization is no-go (performance penalty).

Example

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

var it2 = it1   // (*)
it2.next()

println(it1.next())

This would make simply reference to it1, so when changing it1, it2 changes as well and vice-versa.

The example above uses List, I am currently struggling with HashMap, but the question is general one -- just iterator.

Approach #1

If you edit line (*) and write:

var it2 = it1.toList.iterator

(this was suggested as solution in the linked question) the exception is thrown while executing the program.

Approach #2

"You take the list and...". No, I don't. I don't have a list, I have iterator. In general I don't know anything about collection which underlies the iterator, the only thing I have is iterator. I have to "fork" it.

like image 350
greenoldman Avatar asked Oct 17 '11 18:10

greenoldman


4 Answers

You can't duplicate an iterator without destroying it. The contract for iterator is that it can only be traversed once.

The question you linked to shows how to get two copies in exchange for the one you've destroyed. You cannot keep using the original, but you can now run the two new copies forward independently.

like image 57
Rex Kerr Avatar answered Oct 01 '22 15:10

Rex Kerr


It's pretty easy to create a List iterator that you can duplicate without destroying it: this is basically the definition of the iterator method copied from the List source with a fork method added:

class ForkableIterator[A] (list: List[A]) extends Iterator[A] {
    var these = list
    def hasNext: Boolean = !these.isEmpty
    def next: A = 
      if (hasNext) {
        val result = these.head; these = these.tail; result
      } else Iterator.empty.next
    def fork = new ForkableIterator(these)
}

Use:

scala> val it = new ForkableIterator(List(1,2,3,4,5,6))
it: ForkableIterator[Int] = non-empty iterator

scala> it.next
res72: Int = 1

scala> val it2 = it.fork
it2: ForkableIterator[Int] = non-empty iterator

scala> it2.next
res73: Int = 2

scala> it2.next
res74: Int = 3

scala> it.next
res75: Int = 2

I had a look at doing this for HashMap but it seems more complicated (partly because there are different map implementations depending on collection size). So probably best to use the above implementation on yourMap.toList.

like image 44
Luigi Plinge Avatar answered Oct 01 '22 15:10

Luigi Plinge


As Rex said, it is impossible to make a copy of an Iterator without destroying it. That said, what is the problem with duplicate?

var list = List(1,2,3,4,5)
var it1 = list.iterator
it1.next()

val (it1a, it1b) = it1.duplicate
it1 = it1a
var it2 = it1b
it2.next()

println(it1.next())
like image 32
Daniel C. Sobral Avatar answered Oct 01 '22 14:10

Daniel C. Sobral


I think this is a very good question, it's a pity that many one doesn't understood the value of the problem. In the age of Big Data there are a lot of situation that we have a stream, not an allocated list of the data that cannot be collected or fit into memory. And the repeating of it from the very begin is costly too. What we can do if we need two (or more) separate calculation with the data? For example we may need to calculate min, max, sum, md5 etc using already written functions with only one pass through in the different threads.

The general solution is to use Akka-Stream. This will do it. But is it possible with Iterator, that is the easiest way in Java/Scala to represent such streaming data source? The answer is yes, although we "could NOT proceed with original and copy completely independently" in meaning that we have to synchronize the speeds of consumption of each consumer thread. (Akka-Stream do this leveraging using back-pressure and some intermediate buffers).

So here is my easy solution: to use Phaser. With it we can make Iterator wrapper over one-pass source. This object are to use in each consumer thread as simple Iterator. Using it you are to know the number of consuming threads in advance. Also each consumer-thread MUST drain the source until the end to avoid the hang of all overs (using flush() method for example).

import java.util.concurrent.Phaser
import java.util.concurrent.atomic.AtomicBoolean

// it0 - input source iterator
// num - exact number of consuming threads. We have to know it in advance.
case class ForkableIterator[+A]( it0: Iterator[A], num: Int ) extends Phaser(num) with Iterator[A] {

  val it = it0.flatMap( Stream.fill(num)(_) )  // serial replicator

  private var hasNext0 = new AtomicBoolean( it0.hasNext )
  override def hasNext: Boolean = hasNext0.get()

  override def next(): A = {
    arriveAndAwaitAdvance()
    val next = it.synchronized {
      val next = it.next()
      if (hasNext0.get) hasNext0.set(it.hasNext)
      next
    }
    arriveAndAwaitAdvance() // otherwise the tasks locks at the end the last data element
    next
  }

  // In case that a consumer gives up to read before the end of its source data stream occurs
  // it HAVE to drain the last to avoid block others. (Note: Phaser has no "unregister" method?).
  // Calling it may be avoided if all consumers read exactly the same amount of data,
  // e.g. until the very end of it.
  def flush(): Unit = while (hasNext) next()
}

PS This "ForkableIterator" was successfully used by me with Spark to perform several independent aggregations over long stream of source data. In such case I have no bother about creating threads manually. You may also use Scala Futures / Monix Tasks etc.

PSPS I recheck the JDK Phaser specification now and find that It actually has "unregister" method called arriveAndDeregister(). So use it instead of flush() if a consumer complete.

like image 30
Michael Shestero Avatar answered Oct 01 '22 16:10

Michael Shestero