Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to best shut down a clojure core.async pipeline of processes

I have a clojure processing app that is a pipeline of channels. Each processing step does its computations asynchronously (ie. makes a http request using http-kit or something), and puts it result on the output channel. This way the next step can read from that channel and do its computation.

My main function looks like this

(defn -main [args]
 (-> file/tmp-dir
  (schedule/scheduler)
  (search/searcher)
  (process/resultprocessor)
  (buy/buyer)
  (report/reporter)))

Currently, the scheduler step drives the pipeline (it hasn't got an input channel), and provides the chain with workload.

When I run this in the REPL:

(-main "some args")

It basically runs forever due to the infinity of the scheduler. What is the best way to change this architecture such that I can shut down the whole system from the REPL? Does closing each channel means the system terminates?

Would some broadcast channel help?

like image 534
Marten Sytema Avatar asked Oct 10 '15 07:10

Marten Sytema


3 Answers

I would suggest using something like https://github.com/stuartsierra/component to handle system setup. It ensures that you could easily start and stop your system in the REPL. Using that library, you would set it up so that each processing step would be a component, and each component would handle setup and teardown of channels in their start and stop protocols. Also, you could probably create an IStream protocol for each component to implement and have each component depend on components implementing that protocol. It buys you some very easy modularity.

You'd end up with a system that looks like the following:

(component/system-map
 :scheduler (schedule/new-scheduler file/tmp-dir)
 :searcher  (component/using (search/searcher)
                             {:in :scheduler})
 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :buyer     (component/using (buy/buyer)
                             {:in :processor})
 :report    (component/using (report/reporter)
                             {:in :buyer}))

One nice thing with this sort of approach is that you could easily add components if they rely on a channel as well. For example, if each component creates its out channel using a tap on an internal mult, you could add a logger for the processor just by a logging component that takes the processor as a dependency.

 :processor (component/using (process/resultprocessor)
                             {:in :searcher})
 :processor-logger (component/using (log/logger)
                                    {:in processor})

I'd recommend watching his talk as well to get an idea of how it works.

like image 123
alvinfrancis Avatar answered Oct 14 '22 20:10

alvinfrancis


You could have your scheduler alts! / alts!! on a kill channel and the input channel of your pipeline:

(def kill-channel (async/chan))

(defn scheduler [input output-ch kill-ch]
  (loop []
    (let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
                  :priority true)]
      (if-not (= p kill-ch)
        (recur))))

Putting a value on kill-channel will then terminate the loop.

Technically you could also use output-ch to control the process (puts to closed channels return false), but I normally find explicit kill channels cleaner, at least for top-level pipelines.

To make things simultaneously more elegant and more convenient to use (both at the REPL and in production), you could use Stuart Sierra's component, start the scheduler loop (on a separate thread) and assoc the kill channel on to your component in the component's start method and then close! the kill channel (and thereby terminate the loop) in the component's stop method.

like image 7
Michał Marczyk Avatar answered Oct 14 '22 22:10

Michał Marczyk


You should consider using Stuart Sierra's reloaded workflow, which depends on modelling your 'pipeline' elements as components, that way you can model your logical singletons as 'classes', meaning you can control the construction and destruction (start/stop) logic for each one of them.

like image 1
Krisztián Szabó Avatar answered Oct 14 '22 20:10

Krisztián Szabó