Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clojure: alternative to using a mutex/lock and a counter

Scenario: I have a server listening to six active TCP/IP connections. When a "ready" message comes in, an event will be raised on its own thread. When the server has received a "ready" message from each connection, it needs to run the "start" function.

My object oriented solution would likely involve using a mutex and a counter. Something like:

int _countDown= 6;
object _lock;
void ReadyMessageReceivedForTheFirstTimeFromAConnection() {
    lock(_lock) {
      --_countDown; // 
       if (_countDown==0) Start();
    }
}

How could this problem be solved in Clojure without resorting to locks/mutexes?

like image 870
Anonymous Avatar asked Feb 11 '16 23:02

Anonymous


3 Answers

When you prefer a pure clojure version, you can use a promise to give your futures a go.

Every time you receive message you increment the conn-count the watch checks if the treshold is reached and delivers :go to the barrier promise.

(def wait-barrier (promise))
(def conn-count (atom 0))

(add-watch conn-count :barrier-watch
           (fn [key ref old-state new-state]
             (when (== new-state 6)
               (deliver wait-barrier :go))))  

dummy-example:

(def wait-barrier (promise))
(def conn-count (atom 0))
(defn worker-dummy []
  (when (= @wait-barrier :go)
    (println "I'm a worker")))

(defn dummy-receive-msg []
  (doall (repeatedly 6,
                     (fn []
                       (println "received msg")
                       (swap! conn-count inc)))))

(let [workers (doall (repeatedly 6 (fn [] (future (worker-dummy)))))]
  (add-watch conn-count :barrier-watch
             (fn [key ref old-state new-state]
               (when (== new-state 6)
                 (deliver wait-barrier :go))))
  (dummy-receive-msg)
  (doall (map deref workers)))
like image 65
murphy Avatar answered Nov 18 '22 20:11

murphy


You can use a CountDownLatch or a Phaser for this purpose.

In my futures library, imminent, I used both. CountDownLatch first and then replaced it with a Phaser for ForkJoin compatibility (might not be necessary in your case). You can see the change in this diff. Hopefully it gives you an idea of usage for both.

With latches the general idea would be:

(let [latch (CountDownLatch. 6)]
  (on-receive-message this (fn [_] (.countDown latch)))
  (.await latch)

...or something like that.

like image 38
leonardoborges Avatar answered Nov 18 '22 20:11

leonardoborges


Since it hasn't been mentioned so far: you could easily do that with core.async. Have a look at this MCVE:

(let [conn-count 6
      ready-chan (chan)]

  ;; Spawn a thread for each connection.
  (doseq [conn-id (range conn-count)]
    (thread
      (Thread/sleep (rand-int 2000))
      (>!! ready-chan conn-id)))

  ;; Block until all connections are established.
  (doseq [total (range 1 (inc conn-count))]
    (println (<!! ready-chan) "connected," total "overall"))

  ;; Invoke start afterwards.
  (println "start"))
;; 5 connected, 1 overall
;; 3 connected, 2 overall
;; 4 connected, 3 overall
;; 0 connected, 4 overall
;; 1 connected, 5 overall
;; 2 connected, 6 overall
;; start
;;=> nil

You could also use a channel to implement a countdown latch (borrowed from Christophe Grand):

(defn count-down-latch-chan [n]
  (chan 1 (comp (drop (dec n)) (take 1))))

For a short introduction into core.async, check out this Gist. For a longer one, read the corresponding chapter in "Clojure for the Brave and True".

like image 2
beatngu13 Avatar answered Nov 18 '22 20:11

beatngu13