Suppose that channel chan
has the values "1" and "2" on queue.
Goal: Make a function which takes chan
and returns the vector [1 2]
. Note that I am totally fine if this function has to block for some time before its value is returned.
Attempt:
(defn chan->vector
[chan]
(let [a (atom true) v []]
(while (not-nil? @a)
(go
(reset! a (<! chan))
(into v @a)
(reset! a (<! chan))
)
) v
)
)
Result: My REPL freezes and eventually spits out a huge error. I have come to realize that this is because the (go ...)
block is asynchronous, and so immediately returns. Thus the atom іn my (while ...)
loop is never given a chance to be set to nil
and the loop can never terminate.
So how do I accomplish the desired result? In case it's relevant, I'm using ClojureScript and targetting nodejs.
you should use alts!
from core.async
to fulfill this task
(https://clojure.github.io/core.async/#clojure.core.async/alts!):
(def x (chan 10))
(go (>! x 1)
(>! x 2)
(>! x 3))
(defn read-all [from-chan]
(<!! (go-loop [res []]
(let [[v _] (alts! [from-chan] :default :complete)]
(if (= v :complete)
res
(recur (conj res v)))))))
(read-all x)
;; output: [1 2 3]
(read-all x)
;; output: []
(go (>! x 10)
(>! x 20)
(>! x 30)
(>! x 40))
(read-all x)
;; output: [10 20 30 40]
inside the go-loop
this (a/alts! [from-chan] :default :complete)
tries to read any value from channel, and in case there are no value available at once it emits the default value, so you will see you should break the loop and return accumulated values.
update: as the blocking read (<!!
) is absent in cljs, you can rewrite it the following way:
(defn read-all [from-chan]
(go-loop [res []]
(let [[v _] (alts! [from-chan] :default :complete)]
(if (= v :complete)
res
(recur (conj res v)))))))
so it will return the channel, and then just read one value from there:
(go (let [res (<! (read-all x))]
(println res)
;; do something else
))
You can use clojure.core.async/reduce
:
;; demo setup
(def ch (async/chan 2))
(async/>!! ch :foo)
(async/>!! ch :bar)
;; background thread to print reduction result
(async/thread
(prn (async/<!! (async/reduce conj [] ch))))
;; closing the channel…
(async/close! ch)
;; …terminates the reduction and the result gets printed out:
;; [:foo :bar]
clojure.core.async/reduce
returns a channel that will produce a value if and when the original channel closes. Internally it uses a go
block and will release control in between taking elements from the original channel.
If you want to produce a value after a certain amount of time passes whether or not the original channel closes, you can either wrap the original channel in a pass-through channel that will close itself after a timeout passes or you can use a custom approach to the reduction step (perhaps the approach suggested by @leetwinski).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With