Rate-limited event handler in erlang/OTP

I have a data source that produces point at a potentially high rate, and I'd like to perform a possibly time-consuming operation on each point; but I would also like the system to degrade gracefully when it becomes overloaded, by dropping excess data points.

As far as I can tell, using a gen_event will never skip events. Conceptually, what I would like the gen_event to do is to drop all but the latest pending events before running the handlers again.

Is there a way to do this with standard OTP ? or is there a good reason why I should not handle things that way ?

So far the best I have is using a gen_server and relying on the timeout to trigger the expensive events:

init() -> 
    {ok, Pid} = gen_event:start_link(),
    {ok, {Pid, none}}.

handle_call({add, H, A},_From,{Pid,Data}) ->
    {reply, gen_event:add_handler(Pid,H,A), {Pid,Data}}.

handle_cast(Data,{Pid,_OldData}) -> 
    {noreply, {Pid,Data,0}}.  % set timeout to 0 

handle_info(timeout, {Pid,Data}) ->
    {noreply, {Pid,Data}}.

Is this approach correct ? (esp. with respect to supervision ? )

1 Answers

I can't comment on supervision, but I would implement this as a queue with expiring items.

I've implemented something that you can use below.

I made it a gen_server; when you create it you give it a maximum age for old items.

Its interface is that you can send it items to be processed and you can request items that have not been dequeued It records the time at which it receives every item. Every time it receives an item to be processed, it checks all the items in the queue, dequeueing and discarding those that are older than the maximum age. (If you want the maximum age to be always respected, you can filter the queue before you return queued items)

Your data source will cast data ({process_this, Anything}) to the work queue and your (potentially slow) consumers process will call (gimme) to get data.


-export([init/1, handle_cast/2, handle_call/3]).

init(DiscardAfter) ->
  {ok, {DiscardAfter, queue:new()}}.

handle_cast({process_this, Data}, {DiscardAfter, Queue0}) ->
  Instant = now(),
  Queue1 = queue:filter(fun({Stamp, _}) -> not too_old(Stamp, Instant, DiscardAfter) end, Queue0),
  Queue2 = queue:in({Instant, Data}, Queue1),
  {noreply, {DiscardAfter, Queue2}}.

handle_call(gimme, From, State = {DiscardAfter, Queue0}) ->
  case queue:is_empty(Queue0) of
    true ->
      {reply, no_data, State};
    false ->
      {{value, {_Stamp, Data}}, Queue1} = queue:out(Queue0),
      {reply, {data, Data}, {DiscardAfter, Queue1}}

delta({Mega1, Unit1, Micro1}, {Mega2, Unit2, Micro2}) ->
  ((Mega2 - Mega1) * 1000000 + Unit2 - Unit1) * 1000000 + Micro2 - Micro1.

too_old(Stamp, Instant, DiscardAfter) ->
  delta(Stamp, Instant) > DiscardAfter.

Little demo at the REPL:

{ok, PidSrv} = gen_server:start(work_queue, 10 * 1000000, []).         
gen_server:cast(PidSrv, {process_this, <<"going_to_go_stale">>}),      
timer:sleep(11 * 1000),                                                
gen_server:cast(PidSrv, {process_this, <<"going to push out previous">>}),
{gen_server:call(PidSrv, gimme), gen_server:call(PidSrv, gimme)}.        
