Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala, read file, process lines and write output to a new file using concurrent (akka), asynchronous APIs (nio2)

1: I'm running into a problem trying to process a large text file - 10Gigs+

Single thread solution is the following:

val writer = new PrintWriter(new File(output.getOrElse("output.txt")));
for(line <- scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines())
{
  writer.println(DigestUtils.HMAC_SHA_256(line))
}
writer.close()

2: I tried concurrent processing using

val futures = scala.io.Source.fromFile(file.getOrElse("data.txt")).getLines
               .map{ s => Future{ DigestUtils.HMAC_SHA_256(s) } }.to
val results = futures.map{ Await.result(_, 10000 seconds) }

This yields in a GC overhead limit exceeded exception (see Appendix A for stacktrace)

3: I tried using Akka IO with combination of AsynchronousFileChannel following https://github.com/drexin/akka-io-file I am able to read the file in byte chunks using FileSlurp but have not been able to find a solution to read file by lines which is a requirement.

Any help would be greatly appreciated. Thank you.

APPENDIX A

[error] (run-main) java.lang.OutOfMemoryError: GC overhead limit exceeded
java.lang.OutOfMemoryError: GC overhead limit exceeded
        at java.nio.CharBuffer.wrap(Unknown Source)
        at sun.nio.cs.StreamDecoder.implRead(Unknown Source)
        at sun.nio.cs.StreamDecoder.read(Unknown Source)
        at java.io.InputStreamReader.read(Unknown Source)
        at java.io.BufferedReader.fill(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at java.io.BufferedReader.readLine(Unknown Source)
        at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.s
cala:67)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
        at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:
48)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:7
16)
        at scala.collection.immutable.VectorBuilder.$plus$plus$eq(Vector.scala:6
92)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at scala.collection.AbstractIterator.to(Iterator.scala:1157)
        at com.test.Twitterhashconcurrentcli$.doConcurrent(Twitterhashconcu
rrentcli.scala:35)
        at com.test.Twitterhashconcurrentcli$delayedInit$body.apply(Twitter
hashconcurrentcli.scala:62)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:
12)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.App$$anonfun$main$1.apply(App.scala:71)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.generic.TraversableForwarder$class.foreach(Traversab
leForwarder.scala:32)
        at scala.App$class.main(App.scala:71)
like image 849
alexm Avatar asked Dec 02 '22 20:12

alexm


2 Answers

The trick here is to avoid potentially reading all the data into memory at once. If you iterate and send lines to workers, you run this risk because sending to an actor is async so you might read all the data into memory and it will sit in the mailboxes of the actors probably leading to an OOM exception. A better high level approach would be to use a single master actor and a pool of child workers underneath it for the processing. The trick here is to use a lazy stream over the file (like the Iterator returned from scala.io.Source.fromX) in the master and then use a work-pulling pattern in the workers to prevent their mailboxes with filling up with data. Then, when the iterator no longer has any lines, the master stops itself and that will stop the workers (and if necessary, you can also use this point to shutdown the actor system if that's what you really want to do).

Here is a very rough outline. Please note that I did not test this yet:

import akka.actor._
import akka.routing.RoundRobinLike
import akka.routing.RoundRobinRouter
import scala.io.Source
import akka.routing.Broadcast

object FileReadMaster{
  case class ProcessFile(filePath:String)
  case class ProcessLines(lines:List[String], last:Boolean = false)
  case class LinesProcessed(lines:List[String], last:Boolean = false)

  case object WorkAvailable
  case object GimmeeWork
}

class FileReadMaster extends Actor{
  import FileReadMaster._

  val workChunkSize = 10
  val workersCount = 10

  def receive = waitingToProcess

  def waitingToProcess:Receive = {
    case ProcessFile(path) =>
      val workers = (for(i <- 1 to workersCount) yield context.actorOf(Props[FileReadWorker])).toList
      val workersPool = context.actorOf(Props.empty.withRouter(RoundRobinRouter(routees = workers)))
      val it = Source.fromFile(path).getLines
      workersPool ! Broadcast(WorkAvailable)
      context.become(processing(it, workersPool, workers.size))

      //Setup deathwatch on all
      workers foreach (context watch _)
  }

  def processing(it:Iterator[String], workers:ActorRef, workersRunning:Int):Receive = {
    case ProcessFile(path) => 
      sender ! Status.Failure(new Exception("already processing!!!"))


    case GimmeeWork if it.hasNext =>
      val lines = List.fill(workChunkSize){
        if (it.hasNext) Some(it.next)
        else None
      }.flatten

      sender ! ProcessLines(lines, it.hasNext)

      //If no more lines, broadcast poison pill
      if (!it.hasNext) workers ! Broadcast(PoisonPill)

    case GimmeeWork =>
      //get here if no more work left

    case LinesProcessed(lines, last) =>
      //Do something with the lines

    //Termination for last worker
    case Terminated(ref)  if workersRunning == 1 =>
      //Done with all work, do what you gotta do when done here

    //Terminared for non-last worker
    case Terminated(ref) =>
      context.become(processing(it, workers, workersRunning - 1))

  }
}

class FileReadWorker extends Actor{
  import FileReadMaster._

  def receive = {
    case ProcessLines(lines, last) => 
      sender ! LinesProcessed(lines.map(_.reverse), last)
      sender ! GimmeeWork

    case WorkAvailable =>
      sender ! GimmeeWork
  }
}

The idea is that the master iterates over a file's contents and sends chunks of work to a pool of child workers. When the file processing starts, the master tells all the children that work is available. Each child then continues to request work until there is no more work left. When the master detects that the file is done being read, it broadcasts a poison pill to the children which will let them finish any outstanding work and then stop. When all of the children stop, the master can finish whatever cleanup is needed.

Again, this is very rough based on what I think you are asking about. If I'm off in any area, let me know and I can amend the answer.

like image 106
cmbaxter Avatar answered Jan 11 '23 16:01

cmbaxter


In fact, in parallel variant, you are trying to first read all the file into memory, as a list of lines, and then to take a copy (with method List.to). Evidently, this cause OOME.

To parallelize, first decide it is worth doing. You should not parallelize reading from sequential file (as well as writing): this only cause excessive moving of magnetic heads and makes things slow. Parallelization only make sense if DigestUtils.HMAC_SHA_256(s) takes comparable or greater time than reading a line. Make a benchmarks to measure both times. Then, if you decide that parallelization of hash code computation is worth doing, find out the number of working threads: idea is that elapsed computational time be roughly equal to reading time. Let one thread read lines, pack them in batches (say 1000 lines in a batch), and puts batches in an ArrayBlockingQueue of fixed size (say, 1000). Batching is required because there are too many lines and so too many synchronized operations on the queue, causing contention. Let working threads read batches from that queue using method take.

One more thread should write results to "output.txt", also connected with a blocking queue. If you has to keep order of lines in the output file, then more complex communication facility should be used instead of the second queue, but this is another story.

like image 23
Alexei Kaigorodov Avatar answered Jan 11 '23 17:01

Alexei Kaigorodov