erlang/OTP 中的速率限制事件处理程序

发布于 2024-11-29 15:38:22 字数 714 浏览 0 评论 0原文

我有一个以潜在高速率生成点的数据源,并且我想对每个点执行可能耗时的操作;但我也希望系统在过载时通过删除多余的数据点来优雅地降级。

据我所知,使用 gen_event 永远不会跳过事件。从概念上讲,我希望 gen_event 做的是在再次运行处理程序之前删除除最新待处理事件之外的所有事件。

有没有办法用标准 OTP 来做到这一点?或者我有充分的理由不应该那样处理事情吗?

到目前为止,我最好的方法是使用 gen_server 并依靠超时来触发昂贵的事件:

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

这种方法正确吗? (特别是关于监督?)

I have a data source that produces point at a potentially high rate, and I'd like to perform a possibly time-consuming operation on each point; but I would also like the system to degrade gracefully when it becomes overloaded, by dropping excess data points.

As far as I can tell, using a gen_event will never skip events. Conceptually, what I would like the gen_event to do is to drop all but the latest pending events before running the handlers again.

Is there a way to do this with standard OTP ? or is there a good reason why I should not handle things that way ?

So far the best I have is using a gen_server and relying on the timeout to trigger the expensive events:

-behaviour(gen_server).
init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    gen_event:sync_notify(Pid,Data),
    {noreply, {Pid,Data}}.

Is this approach correct ? (esp. with respect to supervision ? )

如果你对这篇内容有疑问,欢迎到本站社区发帖提问 参与讨论,获取更多帮助,或者扫码二维码加入 Web 技术交流群。

扫码二维码加入Web技术交流群

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。

评论(2

鹿港巷口少年归 2024-12-06 15:38:22

我无法评论监督,但我会将其实现为包含过期项目的队列。

我已经实现了一些你可以在下面使用的东西。

我把它做成了 gen_server;当你创建它时,你给它一个旧项目的最大年龄。

它的接口是,您可以向其发送要处理的项目,并且可以请求尚未出队的项目。它记录了它接收每个项目的时间。每次收到要处理的项目时,它都会检查队列中的所有项目,将那些早于最大年龄的项目出队并丢弃。 (如果您希望始终遵守最长期限,则可以在返回排队项目之前过滤队列)

您的数据源会将数据 ({process_this, Anything}) 投射到工作队列和您的 (可能很慢)消费者进程将调用 (gimme) 来获取数据。

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

REPL 上的小演示:

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        

I can't comment on supervision, but I would implement this as a queue with expiring items.

I've implemented something that you can use below.

I made it a gen_server; when you create it you give it a maximum age for old items.

Its interface is that you can send it items to be processed and you can request items that have not been dequeued It records the time at which it receives every item. Every time it receives an item to be processed, it checks all the items in the queue, dequeueing and discarding those that are older than the maximum age. (If you want the maximum age to be always respected, you can filter the queue before you return queued items)

Your data source will cast data ({process_this, Anything}) to the work queue and your (potentially slow) consumers process will call (gimme) to get data.

-module(work_queue).
-behavior(gen_server).

-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}
  end.

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

Little demo at the REPL:

c(work_queue).
{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        
凉世弥音 2024-12-06 15:38:22

有没有办法使用标准 OTP 来做到这一点?

不。

我有充分的理由不应该那样处理事情吗?

不,提前超时可以提高整个系统的性能。请在此处了解如何操作。

这种做法正确吗? (特别是关于监督?)

不知道,您还没有提供监督代码。


作为第一个问题的一些额外信息:

如果您可以使用 OTP 之外的第三方库,那么有一些库可以添加抢占式超时,这就是您所描述的。

我比较熟悉的有两个,第一个是dispcount,第二个是chick (我是chick的作者,我会尽量不在这里为该项目做广告)。

Dispcount 对于只有有限数量的可同时运行且无需排队的作业的单一资源非常有效。你可以在这里阅读相关内容(警告很多非常有趣的信息!)。

Dispcount 对我不起作用,因为我必须生成 4000 多个进程池来处理应用程序内不同队列的数量。我写小鸡是因为我需要一种方法来动态增加和减少队列长度,以及能够对请求进行排队并拒绝其他请求,而不必生成 4000 多个进程池。

如果我是你,我会首先尝试discount(因为大多数解决方案不需要chick),然后如果你需要一些更动态的东西,那么可以响应一定数量的请求的池尝试chick。

Is there a way to do this with standard OTP ?

No.

is there a good reason why I should not handle things that way ?

No, timing out early can increase the performance of the entire system. Read about how here.

Is this approach correct ? (esp. with respect to supervision ? )

No idea, you haven't provided the supervision code.


As a bit of extra information to your first question:

If you can use 3rd party libraries outside of OTP, there are a few out there that can add preemptive timeouts, which is what you are describing.

There are two that I am familiar with the first is dispcount, and the second is chick (I'm the author of chick, i'll try not to advertise the project here).

Dispcount works really good for single resources that only have a limited number of jobs that can be run at the same time and does no queuing. you can read about it here (warning lots of really interesting information!).

Dispcount didn't work for me because i would have had to spawn 4000+ pools of processes to handle the amount of different queues inside of my app. I wrote chick because I needed a way to dynamically increase and decrease my queue length, as well as being able to queue up requests and deny others, without having to spawn 4000+ pools of processes.

If I were you I would try out discount first (as most solutions do not need chick), and then if you need something a bit more dynamic then a pool that can respond to a certain number of requests try out chick.

~没有更多了~
我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。
原文