Suppose I need to apply two functions f: String => A
and g: A => B
to each line in a large text file to create eventually a list of B
.
Since the file is large and f
and g
are expensive I would like to make the processing concurrent. I can use "parallel collections" and do something like io.Source.fromFile("data.txt").getLines.toList.par.map(l => g(f(l))
but it does not execute reading the file, f
, and g
concurrently.
What is the best way to implement concurrency in this example?
First, an important note: Don't use .par
on List
since it requires copying all the data (since List
can only be read sequentially). Instead, use something like Vector
, for which the .par
conversion can happen without the copying.
It seems like you're thinking of the parallelism the wrong way. Here's what would happen:
If you have a file like this:
0
1
2
3
4
5
6
7
8
9
And functions f
and g
:
def f(line: String) = {
println("running f(%s)".format(line))
line.toInt
}
def g(n: Int) = {
println("running g(%d)".format(n))
n + 1
}
Then you can do:
io.Source.fromFile("data.txt").getLines.toIndexedSeq[String].par.map(l => g(f(l)))
And get output:
running f(3)
running f(0)
running f(5)
running f(2)
running f(6)
running f(1)
running g(2)
running f(4)
running f(7)
running g(4)
running g(1)
running g(6)
running g(3)
running g(5)
running g(0)
running g(7)
running f(9)
running f(8)
running g(9)
running g(8)
So even though the entire g(f(l))
operation is happening on the same thread, you can see that each line may be processed in parallel. Thus, many f
and g
operations can be happening simultaneously on separate threads, but the f
and g
for a particular line will happen in sequentially.
This is, after all, the way you should expect since there's actually no way that it could read the line, run f
, and run g
in parallel. For example, how could it execute g
on the output of f
if the line hasn't yet been read?
You can use map
on Future
:
val futures = io.Source.fromFile(fileName).getLines.map{ s => Future{ stringToA(s) }.map{ aToB } }.toIndexedSeq
val results = futures.map{ Await.result(_, 10 seconds) }
// alternatively:
val results = Await.result(Future.sequence(futures), 10 seconds)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With