Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent reading and processing file line by line in Scala

Suppose I need to apply two functions f: String => A and g: A => B to each line in a large text file to create eventually a list of B.

Since the file is large and f and g are expensive I would like to make the processing concurrent. I can use "parallel collections" and do something like io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l)) but it does not execute reading the file, f, and g concurrently.

What is the best way to implement concurrency in this example?

like image 233
Michael Avatar asked Dec 04 '22 01:12

Michael


2 Answers

First, an important note: Don't use .par on List since it requires copying all the data (since List can only be read sequentially). Instead, use something like Vector, for which the .par conversion can happen without the copying.

It seems like you're thinking of the parallelism the wrong way. Here's what would happen:

If you have a file like this:

0
1
2
3
4
5
6
7
8
9

And functions f and g:

def f(line: String) = {
  println("running f(%s)".format(line))
  line.toInt
}

def g(n: Int) = {
  println("running g(%d)".format(n))
  n + 1
}

Then you can do:

io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))

And get output:

running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)

So even though the entire g(f(l)) operation is happening on the same thread, you can see that each line may be processed in parallel. Thus, many f and g operations can be happening simultaneously on separate threads, but the f and g for a particular line will happen in sequentially.

This is, after all, the way you should expect since there's actually no way that it could read the line, run f, and run g in parallel. For example, how could it execute g on the output of f if the line hasn't yet been read?

like image 177
dhg Avatar answered Jan 05 '23 01:01

dhg


You can use map on Future:

val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq

val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
like image 29
senia Avatar answered Jan 05 '23 01:01

senia