In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:
pmap(F, L) ->
S = self(),
%% make_ref() returns a unique reference
%% we'll match on this later
Ref = erlang:make_ref(),
Pids = map(fun(I) ->
spawn(fun() -> do_f(S, Ref, F, I) end)
end, L),
%% gather the results
gather(Pids, Ref).
do_f(Parent, Ref, F, I) ->
Parent ! {self(), Ref, (catch F(I))}.
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
end;
gather([], _) ->
[].
It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.
When the gather()
function is executed, it starts to match a first Pid
from a Pids
list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid
? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather()
function we would have to loop through all mailbox messages to find a match with a Pid
that we have taken from the Pids
list. That is N * N worst case scenario for a list of size N.
I have even managed to prove the existence of this bottleneck:
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
%% Here it is:
Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
end;
How can I avoid this bottleneck?
The problem is that if you want to have a correct solution you still have to:
Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of Ref
ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result with lists:keysort/2
at the very end of the pmap
, which adds some overhead, but it's likely to be less than O(n^2)
.
-module(test).
-compile(export_all).
pmap(F, L) ->
S = self(),
% make_ref() returns a unique reference
% we'll match on this later
Ref = erlang:make_ref(),
Count = lists:foldl(fun(I, C) ->
spawn(fun() ->
do_f(C, S, Ref, F, I)
end),
C+1
end, 0, L),
% gather the results
Res = gather(0, Count, Ref),
% reorder the results
element(2, lists:unzip(lists:keysort(1, Res))).
do_f(C, Parent, Ref, F, I) ->
Parent ! {C, Ref, (catch F(I))}.
gather(C, C, _) ->
[];
gather(C, Count, Ref) ->
receive
{C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
end.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With