When using clojure.core.async, is there a way to have one channel wait for the first item to be put on it, then wait some small amount of time, then get all the items currently on the channel (that could have arrived while waiting) and get all of them without blocking?
I.e. is there a way to implement get-available-items
:
(defn delayer [ch ch2]
(go (loop []
(when-let [v (<! ch)]
(<! (timeout 500))
(let [vs (get-available-items ch)
items (cons v vs)]
(>! ch2 items))
(recur)))))
Basically, something like BlockingQueue.drain in Java.
There are plans to offer this feature with channels, though for now you can check for the presence of something on a channel with:
(alts!! [my-chan] :default :nothing-immediately-in-chan)
by iterating that you can drain a channel without blocking.
PS: extra thanks to tbaldridge and julianlevis on #clojure for helping with this one
You can just alt on the same timeout channel until you run out of "waiting time", collecting any incoming values meanwhile.
These seems to work:
(require '[clojure.core.async :as a :refer [<! >! go chan]])
(defn delayer [in out]
(a/go-loop []
(when-let [v (<! in)]
(loop [batch [v] timeout-ch (a/timeout 500)]
(let [[v ch] (a/alts! [in timeout-ch])]
(if (= in ch)
(recur (conj batch v) timeout-ch)
(>! out batch))))
(recur))))
Notice that we create the timeout channel just once and we reuse it. A simple test to prove that it works:
(def out (chan))
(def in (chan))
(delayer in out)
; print batches as soon as available
(a/go-loop []
(>pprint (str (java.util.Date.) (<! out)))
(recur))
; put a value every 100 millis
(a/go-loop [i 100]
(when-not (zero? i)
(<! (a/timeout 100))
(>! in i)
(recur (dec i))))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With