I am struggling with a design problem in Haskell that I cannot seem to solve in an elegant and satisfying way. I have a system which at its core is based on the concept of event sourcing: the state of the system results from the application of a sequence of events to an initial state. There are different types of events, each type being related to a specific component of the system through a type family:
class Model a where
data Event a :: *
apply :: Event a -> a -> a
instance Model Foo where
data Event Foo = Foo Int
...
instance Model Bar where
data Event Bar = Bar String
...
Currently the system is 100% synchronous and coupled, each model having access to all other model's events and this is quickly becoming a mess, so I want to decouple things through the introduction of an event bus Bus Events
in such a way I should be able to write something like
dispatch :: Bus Events -> Consumer (Event Foo) -> Bus Events
to attach some consumer of Event Foo
to a Bus Events
assuming there is some form of subtyping or subsumption between Event Foo
and Events
.
Then I can add asynchronicity by ensure consumers each run in their own threads.
From a system point of view, this would allow me to ensure each component is independently packagable, limiting dependencies to a subset of all events. Events
type would be defined at the whole application level.
This problem looks deceptively similar to discrete-time FRP yet I cannot seem to be able to wrap my head around it...
Has anyone already dealt with something similar and if yes, how?
EDIT:
I came up with the following code, which makes no use of Source
but is greatly inspired by @Cirdec's proposal:
import Control.Applicative
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Reader
import qualified Data.Vector as V
type Handlers e = V.Vector (Handler e)
data EventBus e = EventBus { handlers :: Handlers e
, eventQueue :: TChan e
, eventThread :: MVar ThreadId
}
newBus :: IO (EventBus e)
newBus = do
chan <- newTChanIO
var <- newEmptyMVar
return $ EventBus V.empty chan var
addHandler :: Handler e -> EventBus e -> EventBus e
addHandler h b@EventBus{..} = b { handlers = V.snoc handlers h }
removeHandler :: Int -> EventBus e -> EventBus e
removeHandler idx b@EventBus{..} = b { handlers = let (h,t) = V.splitAt idx handlers
in h V.++ V.tail t }
startBus :: EventBus e -> IO (EventBus e)
startBus b@EventBus{..} = do
tid <- forkIO (runBus b)
putMVar eventThread tid
return b
runBus :: EventBus e -> IO ()
runBus b@EventBus{..} = do
_ <- takeMVar eventThread
forever $ do
e <- liftIO $ atomically $ readTChan eventQueue
v <- newTVarIO b
runReaderT (runEvents $ publish e) v
-- | A monad to handle pub/sub of events of type @e@
newtype Events e a = Events { runEvents :: ReaderT (TVar (EventBus e)) IO a }
deriving (Applicative, Functor, Monad, MonadIO, MonadReader (TVar (EventBus e)))
newtype Handler e = Handler { handle :: Events e () -- Unsubscription function
-> Events e (e -> Events e ()) -- what to do with events @e@
}
-- | Register a new @Handler e@ within given @Events e@ context
subscribe :: Handler e -> Events e ()
subscribe h = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (addHandler h)
unsubscribe :: Int -> Events e ()
unsubscribe idx = do
bus <- ask
liftIO $ atomically $ modifyTVar' bus (removeHandler idx)
publishBus :: EventBus e -> e -> IO ()
publishBus EventBus{..} = atomically . writeTChan eventQueue
publish :: e -> Events e ()
publish event = do
EventBus{..} <- ask >>= liftIO . atomically . readTVar
forM_ (zip (V.toList handlers) [0..]) (dispatch event)
dispatch :: e -> (Handler e, Int) -> Events e ()
dispatch event (Handler h, idx) = do
hdl <- h (unsubscribe idx)
hdl event
printer :: (Show s) => String -> Handler s
printer prefix = Handler ( \ _ -> return $ \ e -> liftIO (putStrLn $ prefix ++ show e))
An source of events carrying a
s that can be subscribed to has the following type
type Source m a = (a -> m ()) -> m (m ())
| | ^--- how to unsubscribe
| ^--- how to subscribe
^--- what to do when an `a` happens
A consumer or handler of events is naively something that accepts an event source and subscribes to it
type Handler m a = (Source m a ) -> m ()
= ((a -> m ()) -> m (m ())) -> m ()
^-- set up the consumer.
This is a bit convoluted, we can invert things and get a nicer representation for an event handler:
type Handler m a = m () -> m (a -> m ())
| | ^-- what to do when an `a` happens
| ^-- set up the consumer
^-- how to unsubscribe
The original event source was a bit tricky to use; a subscriber might want to unsubscribe in response to an event happening, in which case they'd need to recursively get the resulting unsubscribe action into what to do when an event happens. Starting with the nicer definition of a Handler
we don't have this problem. An event source is now something that accepts an event handler and publishes to it.
type Source m a = (Handler m a ) -> m ()
= (m () -> m (a -> m ())) -> m ()
^-- how to subscribe
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With