Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterate over lines in a file in parallel (Scala)?

I know about the parallel collections in Scala. They are handy! However, I would like to iterate over the lines of a file that is too large for memory in parallel. I could create threads and set up a lock over a Scanner, for example, but it would be great if I could run code such as:

Source.fromFile(path).getLines.par foreach { line =>

Unfortunately, however

error: value par is not a member of Iterator[String]

What is the easiest way to accomplish some parallelism here? For now, I will read in somes lines and handle them in parallel.

like image 784
schmmd Avatar asked Jul 19 '11 17:07

schmmd


3 Answers

You could use grouping to easily slice the iterator into chunks you can load into memory and then process in parallel.

val chunkSize = 128 * 1024
val iterator = Source.fromFile(path).getLines.grouped(chunkSize)
iterator.foreach { lines => 
    lines.par.foreach { line => process(line) }
}

In my opinion, something like this is the simplest way to do it.

like image 94
Joshua Hartman Avatar answered Nov 04 '22 03:11

Joshua Hartman


I'll put this as a separate answer since it's fundamentally different from my last one (and it actually works)

Here's an outline for a solution using actors, which is basically what Kim Stebel's comment describes. There are two actor classes, a single FileReader actor that reads individual lines from the file on demand, and several Worker actors. The workers all send requests for lines to the reader, and process lines in parallel as they are read from the file.

I'm using Akka actors here but using another implementation is basically the same idea.

case object LineRequest
case object BeginProcessing

class FileReader extends Actor {

  //reads a single line from the file or returns None if EOF
  def getLine:Option[String] = ...

  def receive = {
    case LineRequest => self.sender.foreach{_ ! getLine} //sender is an Option[ActorRef]
  }
}

class Worker(reader: ActorRef) extends Actor {

  def process(line:String) ...

  def receive = {
    case BeginProcessing => reader ! LineRequest
    case Some(line) => {
      process(line)
      reader ! LineRequest
    }
    case None => self.stop
  }
}

val reader = actorOf[FileReader].start    
val workers = Vector.fill(4)(actorOf(new Worker(reader)).start)
workers.foreach{_ ! BeginProcessing}
//wait for the workers to stop...

This way, no more than 4 (or however many workers you have) unprocessed lines are in memory at a time.

like image 29
Dan Simon Avatar answered Nov 04 '22 02:11

Dan Simon


Below helped me to achieve

source.getLines.toStream.par.foreach( line => println(line))
like image 2
Mahesh Pujari Avatar answered Nov 04 '22 03:11

Mahesh Pujari