Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to optimize the receive loop for thousands of messages in Erlang?

In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:

pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].

It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.

When the gather() function is executed, it starts to match a first Pid from a Pids list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather() function we would have to loop through all mailbox messages to find a match with a Pid that we have taken from the Pids list. That is N * N worst case scenario for a list of size N.

I have even managed to prove the existence of this bottleneck:

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;

How can I avoid this bottleneck?

like image 536
skanatek Avatar asked Sep 29 '11 09:09

skanatek


1 Answers

The problem is that if you want to have a correct solution you still have to:

  • check if a given reply comes from one of the processes you have spawned
  • ensure proper result order

Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of Ref ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result with lists:keysort/2 at the very end of the pmap, which adds some overhead, but it's likely to be less than O(n^2).

-module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C+1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
    end.
like image 125
gleber Avatar answered Sep 29 '22 23:09

gleber