Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Parallel file processing in Scala

Tags:

scala

akka

actor

Suppose I need to process files in a given folder in parallel. In Java I would create a FolderReader thread to read file names from the folder and a pool of FileProcessor threads. FolderReader reads file names and submits the file processing function (Runnable) to the pool executor.

In Scala I see two options:

  • create a pool of FileProcessor actors and schedule a file processing function with Actors.Scheduler.
  • create an actor for each file name while reading the file names.

Does it make sense? What is the best option?

like image 509
Michael Avatar asked Jul 20 '12 09:07

Michael


3 Answers

Depending on what you're doing, it may be as simple as

for(file<-files.par){
   //process the file
}
like image 130
Dave Griffith Avatar answered Sep 20 '22 08:09

Dave Griffith


I suggest with all my energies to keep as far as you can from the threads. Luckily we have better abstractions which take care of what's happening below, and in your case it appears to me that you do not need to use actors (while you can) but you can use a simpler abstraction, called Futures. They are a part of Akka open source library, and I think in the future will be a part of the Scala standard library as well.

A Future[T] is simply something that will return a T in the future.

All you need to run a future, is to have an implicit ExecutionContext, which you can derive from a java executor service. Then you will be able to enjoy the elegant API and the fact that a future is a monad to transform collections into collections of futures, collect the result and so on. I suggest you to give a look to http://doc.akka.io/docs/akka/2.0.1/scala/futures.html

object TestingFutures {
  implicit val executorService = Executors.newFixedThreadPool(20)
  implicit val executorContext = ExecutionContext.fromExecutorService(executorService)

  def testFutures(myList:List[String]):List[String]= {

    val listOfFutures : Future[List[String]] = Future.traverse(myList){
      aString => Future{
                        aString.reverse
                       }
     }
    val result:List[String] = Await.result(listOfFutures,1 minute)
    result

  }
}

There's a lot going on here:

  • I am using Future.traverse which receives as a first parameter which is M[T]<:Traversable[T] and as second parameter a T => Future[T] or if you prefer a Function1[T,Future[T]] and returns Future[M[T]]
  • I am using the Future.apply method to create an anonymous class of type Future[T]

There are many other reasons to look at Akka futures.

  • Futures can be mapped because they are monad, i.e. you can chain Futures execution :

    Future { 3 }.map { _ * 2 }.map { _.toString }

  • Futures have callback: future.onComplete, onSuccess, onFailure, andThen etc.

  • Futures support not only traverse, but also for comprehension

like image 40
Edmondo1984 Avatar answered Sep 19 '22 08:09

Edmondo1984


Ideally you should use two actors. One for reading the list of files, and one for actually reading the file.

You start the process by simply sending a single "start" message to the first actor. The actor can then read the list of files, and send a message to the second actor. The second actor then reads the file and processes the contents.

Having multiple actors, which might seem complicated, is actually a good thing in the sense that you have a bunch of objects communicating with eachother, like in a theoretical OO system.

Edit: you REALLY shouldn't be doing doing concurrent reading of a single file.

like image 22
darth10 Avatar answered Sep 21 '22 08:09

darth10