如何优化 Erlang 中数千条消息的接收循环?

发布于 2024-12-07 07:39:27 字数 1257 浏览 4 评论 0原文

在《Programming Erlang》一书中的“Programming Multicore CPUs”一章中,Joe Armstrong 给出了一个映射函数并行化的很好的例子:

pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].

它工作得很好,但我相信它存在一个瓶颈,导致它在 100,000 个列表上运行非常慢+ 元素。

当执行gather()函数时,它开始将Pids列表中的第一个Pid与主进程邮箱中的消息进行匹配。但是,如果邮箱中最旧的消息不是来自这个 Pid 该怎么办?然后它会尝试所有其他消息,直到找到匹配项。话虽这么说,有一定的可能性,在执行 gather() 函数时,我们必须循环遍历所有邮箱消息,以找到与我们的 Pid 匹配的内容。已从 Pids 列表中获取。对于大小为 N 的列表来说,这是 N * N 最坏的情况。

我什至设法证明了这个瓶颈的存在:

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;

如何避免这个瓶颈?

In the chapter "Programming Multicore CPUs" of the Programming Erlang book, Joe Armstrong gives a nice example of parallelization of a map function:

pmap(F, L) ->
    S = self(),
    %% make_ref() returns a unique reference
    %% we'll match on this later
    Ref = erlang:make_ref(),
    Pids = map(fun(I) ->
        spawn(fun() -> do_f(S, Ref, F, I) end)
    end, L),
    %% gather the results
    gather(Pids, Ref).

do_f(Parent, Ref, F, I) ->
    Parent ! {self(), Ref, (catch F(I))}.

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)]
    end;

gather([], _) ->
    [].

It works nicely, but I believe there is a bottleneck in it causing it to work really slow on lists with 100,000+ elements.

When the gather() function is executed, it starts to match a first Pid from a Pids list with a message in the main process mailbox. But what if the oldest message in the mailbox is not from this very Pid? Then it tries all other messages until it finds a match. That being said, there is a certain probability, that while executing the gather() function we would have to loop through all mailbox messages to find a match with a Pid that we have taken from the Pids list. That is N * N worst case scenario for a list of size N.

I have even managed to prove the existence of this bottleneck:

gather([Pid|T], Ref) ->
    receive
        {Pid, Ref, Ret} -> [Ret|gather(T, Ref)];
        %% Here it is:
        Other -> io:format("The oldest message in the mailbox (~w) did not match with Pid ~w~n", [Other,Pid])
    end;

How can I avoid this bottleneck?

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(3

可是我不能没有你 2024-12-14 07:39:27

问题是,如果您想获得正确的解决方案,您仍然必须:

  • 检查给定的回复是否来自您拥有的进程之一
    产生
  • 确保正确的结果顺序

这是一个使用计数器而不是列表的解决方案 - 这消除了多次遍历收件箱的必要。 Ref 的匹配可确保我们收到的消息来自我们的孩子。通过在 pmap 末尾使用 lists:keysort/2 对结果进行排序来确保正确的顺序,这会增加一些开销,但可能会小于 >O(n^2)

-module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C+1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
    end.

The problem is that if you want to have a correct solution you still have to:

  • check if a given reply comes from one of the processes you have
    spawned
  • ensure proper result order

Here's a solution which makes use of counters instead of lists - this eliminates necessity to traverse inbox multiple times. Matching of Ref ensures that messages we are receiving are from our children. Proper order is ensured by sorting the result with lists:keysort/2 at the very end of the pmap, which adds some overhead, but it's likely to be less than O(n^2).

-module(test).

-compile(export_all).

pmap(F, L) ->
    S = self(),
    % make_ref() returns a unique reference
    % we'll match on this later
    Ref = erlang:make_ref(),
    Count = lists:foldl(fun(I, C) ->
                                spawn(fun() ->
                                              do_f(C, S, Ref, F, I)
                                      end),
                                C+1
                        end, 0, L),
    % gather the results
    Res = gather(0, Count, Ref),
    % reorder the results
    element(2, lists:unzip(lists:keysort(1, Res))).


do_f(C, Parent, Ref, F, I) ->
    Parent ! {C, Ref, (catch F(I))}.


gather(C, C, _) ->
    [];
gather(C, Count, Ref) ->
    receive
        {C, Ref, Ret} -> [{C, Ret}|gather(C+1, Count, Ref)]
    end.
晨曦÷微暖 2024-12-14 07:39:27

乔的例子很简洁,但实际上您需要一个更重量级的解决方案来解决您的问题。查看 http://code.google.com例如 /p/plists/source/browse/trunk/src/plists.erl

一般来说,您需要做三件事:

  1. 选择一个“足够大”的工作单位。如果工作单元太小,你就会因处理开销而死掉。如果太大,你会因为工作人员闲置而死亡,特别是如果你的工作没有被列表中的元素数量平均分配。

  2. 同时工作人员数量的上限。 Psyeugenic建议按照调度程序来分割它,我建议按照作业数量限制来分割它,比如100个作业。也就是说,您希望启动 100 个作业,然后等到其中一些作业完成后再启动更多作业。

  3. 如果可能的话,考虑改变元素的顺序。如果您不需要考虑订单,速度会快得多。对于许多问题来说这是可能的。如果顺序确实很重要,则使用 dict 按照建议存储内容。对于大元素列表来说速度更快。

基本规则是,一旦您需要并行,您就很少需要基于列表的数据表示。该列表具有固有的线性关系,这是您不希望看到的。 Guy Steele 有一个关于这个主题的演讲:http://vimeo.com/6624203

Joe's example is neat, but in practice you want a more heavyweight solution to your problem. Take a look at http://code.google.com/p/plists/source/browse/trunk/src/plists.erl for instance.

In general, there are three things you want to do:

  1. Pick a work unit which is "big enough". If the work unit is too small, you die by processing overhead. If it is too big, you die by workers being idle, especially if your work is not evenly divided by element count in the list.

  2. Upper bound the number of simultaneous workers. Psyeugenic proposes splitting it by schedulers, I propose splitting it by a job count limit, 100 jobs say. That is, you want to start off 100 jobs and then wait till some of those completes before you start more jobs.

  3. Consider screwing the order of elements if possible. It is much faster if you don't need to take the order into account. For many problems this is possible. If the order does matter, then use a dict to store the stuff in as proposed. It is faster for large-element lists.

The basic rule is that as soon as you want parallel, you rarely want a list-based representation of your data. The list has an inherent linearity to it, which you don't want. There is a talk by Guy Steele on the very subject: http://vimeo.com/6624203

花海 2024-12-14 07:39:27

在这种情况下,您可以使用 dict (来自 pid生成的进程在原始列表中索引)作为 Pids 代替。

In this case you can use dict (from pid of spawned process to index in original list) as Pids instead.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文