parallelize reduce step

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

parallelize reduce step

Alex Dashevski

I want to parallelize reduce step:

-import(lists, [foreach/2]).
%% F1(Pid, X) -> sends {Key,Val} messages to Pid
%% F2(Key, [Val], AccIn) -> AccOut
mapreduce(F1, F2, Acc0, L) ->
S = self(),
Pid= spawn(fun() -> reduce(S, F1, F2, Acc0, L) end),
{Pid, Result} ->Result
reduce(Parent, F1, F2, Acc0, L) ->
process_flag(trap_exit, true),
ReducePid= self(),
foreach(fun(X) ->
spawn_link(fun() -> F1(ReducePid, X) end)
end, L),
N = length(L),
Dict0 = dict:new(),
Dict1 = collect_replies(N, Dict0),
Acc= dict:fold(F2, Acc0, Dict1),
Parent ! {self(), Acc}.
collect_replies(0, Dict) ->
collect_replies(N, Dict) ->
{Key, Val} ->
case dict:is_key(Key, Dict) of
true ->Dict1 = dict:append(Key, Val, Dict),
collect_replies(N, Dict1);
false ->Dict1 = dict:store(Key,[Val], Dict),
collect_replies(N, Dict1)
{'EXIT', _, Why} ->
collect_replies(N-1, Dict)

erlang-questions mailing list
[hidden email]