I have a clojure processing app that is a pipeline of channels. Each processing step does its computations asynchronously (ie. makes a http request using http-kit or something), and puts it result on the output channel. This way the next step can read from that channel and do its computation.
My main function looks like this
(defn -main [args]
(-> file/tmp-dir
(schedule/scheduler)
(search/searcher)
(process/resultprocessor)
(buy/buyer)
(report/reporter)))
Currently, the scheduler step drives the pipeline (it hasn't got an input channel), and provides the chain with workload.
When I run this in the REPL:
(-main "some args")
It basically runs forever due to the infinity of the scheduler. What is the best way to change this architecture such that I can shut down the whole system from the REPL? Does closing each channel means the system terminates?
Would some broadcast channel help?
I would suggest using something like https://github.com/stuartsierra/component to handle system setup. It ensures that you could easily start and stop your system in the REPL. Using that library, you would set it up so that each processing step would be a component, and each component would handle setup and teardown of channels in their start
and stop
protocols. Also, you could probably create an IStream
protocol for each component to implement and have each component depend on components implementing that protocol. It buys you some very easy modularity.
You'd end up with a system that looks like the following:
(component/system-map
:scheduler (schedule/new-scheduler file/tmp-dir)
:searcher (component/using (search/searcher)
{:in :scheduler})
:processor (component/using (process/resultprocessor)
{:in :searcher})
:buyer (component/using (buy/buyer)
{:in :processor})
:report (component/using (report/reporter)
{:in :buyer}))
One nice thing with this sort of approach is that you could easily add components if they rely on a channel as well. For example, if each component creates its out channel using a tap
on an internal mult
, you could add a logger for the processor just by a logging component that takes the processor as a dependency.
:processor (component/using (process/resultprocessor)
{:in :searcher})
:processor-logger (component/using (log/logger)
{:in processor})
I'd recommend watching his talk as well to get an idea of how it works.
You could have your scheduler alts!
/ alts!!
on a kill channel and the input channel of your pipeline:
(def kill-channel (async/chan))
(defn scheduler [input output-ch kill-ch]
(loop []
(let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
:priority true)]
(if-not (= p kill-ch)
(recur))))
Putting a value on kill-channel
will then terminate the loop.
Technically you could also use output-ch
to control the process (puts to closed channels return false
), but I normally find explicit kill channels cleaner, at least for top-level pipelines.
To make things simultaneously more elegant and more convenient to use (both at the REPL and in production), you could use Stuart Sierra's component, start the scheduler loop (on a separate thread) and assoc
the kill channel on to your component in the component's start
method and then close!
the kill channel (and thereby terminate the loop) in the component's stop
method.
You should consider using Stuart Sierra's reloaded workflow, which depends on modelling your 'pipeline' elements as components, that way you can model your logical singletons as 'classes', meaning you can control the construction and destruction (start/stop) logic for each one of them.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With