Disclaimer: I'm pretty new to Erlang and OTP.
I want a simple pubsub in Erlang/OTP, where processes could subscribe at some "hub" and receive a copy of messages that were sent to that hub.
I know about gen_event
, but it processes events in one single event manager process, while I want every subscriber to be a separate, autonomous process. Also, I was unable to grok gen_event
's handlers supervision. Unfortunately, Google results were full of XMPP (Ejabberd) and RabbitMQ links, so I didn't find anything relevant to my idea.
My idea is that such pubsub model seamlessly maps to supervision tree. So I thought to extend the supervisor (a gen_server
under the hood) to be able to send a cast message to all its children.
I've hacked this in my quick-and-dirty custom "dispatcher" behavior:
-module(dispatcher).
-extends(supervisor).
-export([notify/2, start_link/2, start_link/3, handle_cast/2]).
start_link(Mod, Args) ->
gen_server:start_link(dispatcher, {self, Mod, Args}, []).
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, dispatcher, {SupName, Mod, Args}, []).
notify(Dispatcher, Message) ->
gen_server:cast(Dispatcher, {message, Message}).
handle_cast({message, Message}, State) ->
{reply, Children, State} = supervisor:handle_call(which_children, dummy, State),
Pids = lists:filter(fun(Pid) -> is_pid(Pid) end,
lists:map(fun({_Id, Child, _Type, _Modules}) -> Child end,
Children)),
[gen_server:cast(Pid, Message) || Pid <- Pids],
{noreply, State}.
However, while everything seem to work fine at the first glance (children receive messages and are seamlessly restarted when they fail), I wonder whenever this was a good idea.
Could someone, please, criticize (or approve) my approach, and/or recommend some alternatives?
I've recently used gproc to implement pubsub. The example from the readme does the trick.
subscribe(EventType) ->
%% Gproc notation: {p, l, Name} means {(p)roperty, (l)ocal, Name}
gproc:reg({p, l, {?MODULE, EventType}}).
notify(EventType, Msg) ->
Key = {?MODULE, EventType},
gproc:send({p, l, Key}, {self(), Key, Msg}).
From your code it looks to me that gen_event handlers are a perfect match.
The handler callbacks are called from one central process dispatching the messages, but these callbacks shouldn't do much work.
So if you need a autonomous process with its own state for the subscribers, just send a message in the event callback.
Usually these autonomous processes would be gen_servers and you just would call gen_server:cast from your event callbacks.
Supervision is a separate issue, that can be handled by the usual supervision infrastructure that comes with OTP. How you want to do supervision depends on the semantics of your subscriber processes. If they are all identical servers, you could use a simple_one_for_one
for example.
In the init
callback of the subscriber processes you can put the gen_event:add_handler
calls that adds them to the event manager.
You can even use the event manager as supervisor if you use the gen_event:add_sup_handler
function to add your processes if the semantics of this suits you.
Online resources for understanding gen_event better: Learn you some Erlang chapter
Otherwise the Erlang books all have some gen_event introduction. Probably the most thorough one you can find in Erlang and OTP in Action
Oh and BTW: I wouldn't hack up your own supervisors for this.
A very simple example where you do it all yourself is in my very basic chat_demo which is a simple web-based chat server. Look at chat_backend.erl
(or chat_backend.lfe
if you like parentheses) which allows users to subscribe and they will then be sent all messages that arrive at the backend. It does not fit into supervision trees though the modification is simple (although it does use proc_lib
to get better error messages).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With