Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to exhaust a channel's values and then return the result (ClojureScript)?

Suppose that channel chan has the values "1" and "2" on queue.

Goal: Make a function which takes chan and returns the vector [1 2]. Note that I am totally fine if this function has to block for some time before its value is returned.

Attempt:

(defn chan->vector
  [chan]  
  (let [a (atom true) v []]
    (while (not-nil? @a)
      (go
        (reset! a (<! chan))
        (into v @a)
        (reset! a (<! chan))
      )
    ) v
  )
)

Result: My REPL freezes and eventually spits out a huge error. I have come to realize that this is because the (go ...) block is asynchronous, and so immediately returns. Thus the atom іn my (while ...) loop is never given a chance to be set to nil and the loop can never terminate.

So how do I accomplish the desired result? In case it's relevant, I'm using ClojureScript and targetting nodejs.

like image 283
George Avatar asked Dec 24 '22 11:12

George


2 Answers

you should use alts! from core.async to fulfill this task (https://clojure.github.io/core.async/#clojure.core.async/alts!):

(def x (chan 10))

(go (>! x 1)
    (>! x 2)
    (>! x 3))

(defn read-all [from-chan]
  (<!! (go-loop [res []]
           (let [[v _] (alts! [from-chan] :default :complete)]
             (if (= v :complete)
               res
               (recur (conj res v)))))))

(read-all x) 
;; output: [1 2 3]

(read-all x)
;; output: []

(go (>! x 10)
    (>! x 20)
    (>! x 30)
    (>! x 40))

(read-all x)
;; output: [10 20 30 40]

inside the go-loop this (a/alts! [from-chan] :default :complete) tries to read any value from channel, and in case there are no value available at once it emits the default value, so you will see you should break the loop and return accumulated values.

update: as the blocking read (<!!) is absent in cljs, you can rewrite it the following way:

(defn read-all [from-chan]
  (go-loop [res []]
    (let [[v _] (alts! [from-chan] :default :complete)]
      (if (= v :complete)
        res
        (recur (conj res v)))))))

so it will return the channel, and then just read one value from there:

(go (let [res (<! (read-all x))]
      (println res)
      ;; do something else
      ))
like image 84
leetwinski Avatar answered Feb 15 '23 10:02

leetwinski


You can use clojure.core.async/reduce:

;; demo setup
(def ch (async/chan 2))
(async/>!! ch :foo)
(async/>!! ch :bar)

;; background thread to print reduction result
(async/thread
  (prn (async/<!! (async/reduce conj [] ch))))

;; closing the channel…
(async/close! ch)

;; …terminates the reduction and the result gets printed out:
;; [:foo :bar]

clojure.core.async/reduce returns a channel that will produce a value if and when the original channel closes. Internally it uses a go block and will release control in between taking elements from the original channel.

If you want to produce a value after a certain amount of time passes whether or not the original channel closes, you can either wrap the original channel in a pass-through channel that will close itself after a timeout passes or you can use a custom approach to the reduction step (perhaps the approach suggested by @leetwinski).

like image 25
Michał Marczyk Avatar answered Feb 15 '23 10:02

Michał Marczyk