Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write an event bus in Haskell?

I am struggling with a design problem in Haskell that I cannot seem to solve in an elegant and satisfying way. I have a system which at its core is based on the concept of event sourcing: the state of the system results from the application of a sequence of events to an initial state. There are different types of events, each type being related to a specific component of the system through a type family:

class Model a where
   data Event a :: * 
   apply :: Event a -> a -> a

instance Model Foo where
   data Event Foo = Foo Int
   ...

instance Model Bar where
   data Event Bar = Bar String
   ...

Currently the system is 100% synchronous and coupled, each model having access to all other model's events and this is quickly becoming a mess, so I want to decouple things through the introduction of an event bus Bus Events in such a way I should be able to write something like dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events to attach some consumer of Event Foo to a Bus Events assuming there is some form of subtyping or subsumption between Event Foo and Events. Then I can add asynchronicity by ensure consumers each run in their own threads.

From a system point of view, this would allow me to ensure each component is independently packagable, limiting dependencies to a subset of all events. Events type would be defined at the whole application level. This problem looks deceptively similar to discrete-time FRP yet I cannot seem to be able to wrap my head around it...

Has anyone already dealt with something similar and if yes, how?

EDIT:

I came up with the following code, which makes no use of Source but is greatly inspired by @Cirdec's proposal:

import           Control.Applicative
import           Control.Concurrent
import           Control.Concurrent.STM
import           Control.Monad.Reader
import qualified Data.Vector            as V

type Handlers e = V.Vector (Handler e)

data EventBus e = EventBus { handlers    :: Handlers e
                           , eventQueue  :: TChan e
                           , eventThread :: MVar ThreadId
                           }

newBus :: IO (EventBus e)
newBus = do
  chan <- newTChanIO
  var <- newEmptyMVar
  return $ EventBus V.empty chan var

addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h b@EventBus{..} = b { handlers = V.snoc handlers h }

removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx b@EventBus{..} = b { handlers = let (h,t) = V.splitAt idx handlers
                                                  in h V.++ V.tail t }

startBus :: EventBus e -> IO (EventBus e)
startBus b@EventBus{..} = do
  tid <- forkIO (runBus b)
  putMVar eventThread tid
  return b

runBus :: EventBus e -> IO ()
runBus b@EventBus{..} = do
  _ <- takeMVar eventThread
  forever $ do
    e <- liftIO $ atomically $ readTChan eventQueue
    v <- newTVarIO b
    runReaderT (runEvents $ publish e) v

-- | A monad to handle pub/sub of events of type @e@
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
                   deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))

newtype Handler e = Handler { handle :: Events e ()                 -- Unsubscription function
                                     -> Events e (e -> Events e ())  -- what to do with events @e@
                            }


-- | Register a new @Handler e@ within given @Events e@ context
subscribe :: Handler e -> Events e ()
subscribe h = do
  bus <- ask
  liftIO $ atomically $ modifyTVar' bus (addHandler h)

unsubscribe :: Int -> Events e ()
unsubscribe idx = do
  bus <- ask
  liftIO $ atomically $ modifyTVar' bus (removeHandler idx)

publishBus :: EventBus e -> e -> IO ()
publishBus EventBus{..} = atomically . writeTChan eventQueue

publish :: e -> Events e ()
publish event = do
  EventBus{..} <- ask >>= liftIO . atomically . readTVar
  forM_ (zip (V.toList handlers) [0..]) (dispatch event)

dispatch :: e -> (Handler e, Int) -> Events e ()
dispatch event (Handler h, idx) = do
  hdl <- h (unsubscribe idx)
  hdl event

printer :: (Show s) => String -> Handler s
printer prefix = Handler ( \ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))
like image 999
insitu Avatar asked Sep 07 '15 08:09

insitu


1 Answers

An source of events carrying as that can be subscribed to has the following type

type Source m a = (a -> m ()) -> m (m ())
                   |             |  ^--- how to unsubscribe             
                   |             ^--- how to subscribe
                   ^--- what to do when an `a` happens

A consumer or handler of events is naively something that accepts an event source and subscribes to it

type Handler m a = (Source m a             ) -> m ()
                 = ((a -> m ()) -> m (m ())) -> m ()
                                                ^-- set up the consumer.

This is a bit convoluted, we can invert things and get a nicer representation for an event handler:

type Handler m a = m () -> m (a -> m ())
                   |       |  ^-- what to do when an `a` happens
                   |       ^-- set up the consumer
                   ^-- how to unsubscribe

The original event source was a bit tricky to use; a subscriber might want to unsubscribe in response to an event happening, in which case they'd need to recursively get the resulting unsubscribe action into what to do when an event happens. Starting with the nicer definition of a Handler we don't have this problem. An event source is now something that accepts an event handler and publishes to it.

type Source m a = (Handler m a          ) -> m ()
                = (m () -> m (a -> m ())) -> m ()
                                             ^-- how to subscribe
like image 72
Cirdec Avatar answered Nov 12 '22 04:11

Cirdec