Scenario: I have a server listening to six active TCP/IP connections. When a "ready" message comes in, an event will be raised on its own thread. When the server has received a "ready" message from each connection, it needs to run the "start" function.
My object oriented solution would likely involve using a mutex and a counter. Something like:
int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
lock(_lock) {
--_countDown; //
if (_countDown==0) Start();
}
}
How could this problem be solved in Clojure without resorting to locks/mutexes?
When you prefer a pure clojure version, you can use a promise to give your futures a go.
Every time you receive message you increment the conn-count the watch checks if the treshold is reached and delivers :go to the barrier promise.
(def wait-barrier (promise))
(def conn-count (atom 0))
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
dummy-example:
(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
(when (= @wait-barrier :go)
(println "I'm a worker")))
(defn dummy-receive-msg []
(doall (repeatedly 6,
(fn []
(println "received msg")
(swap! conn-count inc)))))
(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
(add-watch conn-count :barrier-watch
(fn [key ref old-state new-state]
(when (== new-state 6)
(deliver wait-barrier :go))))
(dummy-receive-msg)
(doall (map deref workers)))
You can use a CountDownLatch or a Phaser for this purpose.
In my futures library, imminent, I used both. CountDownLatch first and then replaced it with a Phaser for ForkJoin compatibility (might not be necessary in your case). You can see the change in this diff. Hopefully it gives you an idea of usage for both.
With latches the general idea would be:
(let [latch (CountDownLatch. 6)]
(on-receive-message this (fn [_] (.countDown latch)))
(.await latch)
...or something like that.
Since it hasn't been mentioned so far: you could easily do that with core.async. Have a look at this MCVE:
(let [conn-count 6
ready-chan (chan)]
;; Spawn a thread for each connection.
(doseq [conn-id (range conn-count)]
(thread
(Thread/sleep (rand-int 2000))
(>!! ready-chan conn-id)))
;; Block until all connections are established.
(doseq [total (range 1 (inc conn-count))]
(println (<!! ready-chan) "connected," total "overall"))
;; Invoke start afterwards.
(println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil
You could also use a channel to implement a countdown latch (borrowed from Christophe Grand):
(defn count-down-latch-chan [n]
(chan 1 (comp (drop (dec n)) (take 1))))
For a short introduction into core.async, check out this Gist. For a longer one, read the corresponding chapter in "Clojure for the Brave and True".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With