如何优化 Erlang 中数千条消息的接收循环?
在《Programming Erlang》一书中的“Programming Multicore CPUs”一章中,Joe Armstrong 给出了一个映射函数并行化的很好的例子:
pmap(F, L) ->
S = self(),
%% make_ref() returns a unique reference
%% we'll match on this later
Ref = erlang:make_ref(),
Pids = map(fun(I) ->
spawn(fun() -> do_f(S, Ref, F, I) end)
end, L),
%% gather the results
gather(Pids, Ref).
do_f(Parent, Ref, F, I) ->
Parent ! {self(), Ref, (catch F(I))}.
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
end;
gather([], _) ->
[].
它工作得很好,但我相信它存在一个瓶颈,导致它在 100,000 个列表上运行非常慢+ 元素。
当执行gather()函数时,它开始将Pids
列表中的第一个Pid
与主进程邮箱中的消息进行匹配。但是,如果邮箱中最旧的消息不是来自这个 Pid
该怎么办?然后它会尝试所有其他消息,直到找到匹配项。话虽这么说,有一定的可能性,在执行 gather()
函数时,我们必须循环遍历所有邮箱消息,以找到与我们的 Pid
匹配的内容。已从 Pids
列表中获取。对于大小为 N 的列表来说,这是 N * N 最坏的情况。
我什至设法证明了这个瓶颈的存在:
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
%% Here it is:
Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
end;
如何避免这个瓶颈?
In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:
pmap(F, L) ->
S = self(),
%% make_ref() returns a unique reference
%% we'll match on this later
Ref = erlang:make_ref(),
Pids = map(fun(I) ->
spawn(fun() -> do_f(S, Ref, F, I) end)
end, L),
%% gather the results
gather(Pids, Ref).
do_f(Parent, Ref, F, I) ->
Parent ! {self(), Ref, (catch F(I))}.
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
end;
gather([], _) ->
[].
It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.
When the gather()
function is executed, it starts to match a first Pid
from a Pids
list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid
? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather()
function we would have to loop through all mailbox messages to find a match with a Pid
that we have taken from the Pids
list. That is N * N worst case scenario for a list of size N.
I have even managed to prove the existence of this bottleneck:
gather([Pid|T], Ref) ->
receive
{Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
%% Here it is:
Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
end;
How can I avoid this bottleneck?
如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

绑定邮箱获取回复消息
由于您还没有绑定你的真实邮箱,如果其他用户或者作者回复了您的评论,将不能在第一时间通知您!
发布评论
评论(3)
问题是,如果您想获得正确的解决方案,您仍然必须:
产生
这是一个使用计数器而不是列表的解决方案 - 这消除了多次遍历收件箱的必要。
Ref
的匹配可确保我们收到的消息来自我们的孩子。通过在pmap
末尾使用lists:keysort/2
对结果进行排序来确保正确的顺序,这会增加一些开销,但可能会小于>O(n^2)
。The problem is that if you want to have a correct solution you still have to:
spawned
Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of
Ref
ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result withlists:keysort/2
at the very end of thepmap
, which adds some overhead, but it's likely to be less thanO(n^2)
.乔的例子很简洁,但实际上您需要一个更重量级的解决方案来解决您的问题。查看 http://code.google.com例如 /p/plists/source/browse/trunk/src/plists.erl 。
一般来说,您需要做三件事:
选择一个“足够大”的工作单位。如果工作单元太小,你就会因处理开销而死掉。如果太大,你会因为工作人员闲置而死亡,特别是如果你的工作没有被列表中的元素数量平均分配。
同时工作人员数量的上限。 Psyeugenic建议按照调度程序来分割它,我建议按照作业数量限制来分割它,比如100个作业。也就是说,您希望启动 100 个作业,然后等到其中一些作业完成后再启动更多作业。
如果可能的话,考虑改变元素的顺序。如果您不需要考虑订单,速度会快得多。对于许多问题来说这是可能的。如果顺序确实很重要,则使用
dict
按照建议存储内容。对于大元素列表来说速度更快。基本规则是,一旦您需要并行,您就很少需要基于列表的数据表示。该列表具有固有的线性关系,这是您不希望看到的。 Guy Steele 有一个关于这个主题的演讲:http://vimeo.com/6624203
Joe's example is neat, but in practice you want a more heavyweight solution to your problem. Take a look at http://code.google.com/p/plists/source/browse/trunk/src/plists.erl for instance.
In general, there are three things you want to do:
Pick a work unit which is "big enough". If the work unit is too small, you die by processing overhead. If it is too big, you die by workers being idle, especially if your work is not evenly divided by element count in the list.
Upper bound the number of simultaneous workers. Psyeugenic proposes splitting it by schedulers, I propose splitting it by a job count limit, 100 jobs say. That is, you want to start off 100 jobs and then wait till some of those completes before you start more jobs.
Consider screwing the order of elements if possible. It is much faster if you don't need to take the order into account. For many problems this is possible. If the order does matter, then use a
dict
to store the stuff in as proposed. It is faster for large-element lists.The basic rule is that as soon as you want parallel, you rarely want a list-based representation of your data. The list has an inherent linearity to it, which you don't want. There is a talk by Guy Steele on the very subject: http://vimeo.com/6624203
在这种情况下,您可以使用
dict
(来自 pid生成的进程在原始列表中索引)作为Pids
代替。In this case you can use
dict
(from pid of spawned process to index in original list) asPids
instead.