I want to create a channel of clojure.core.async
from another one that just filters specific messages. Therefore I found a function called filter<.
=> (def c1 (chan))
=> (def c2 (filter< even? c1))
=> (put! c1 1)
=> (put! c1 2)
=> (<!! c2)
2
But the function and its friends are marked as deprecated:
Deprecated - this function will be removed. Use transducer instead
There are some ways to use channels with transducer like chan
with the xform
parameter. How can I build a new channel from an existing one using transducers?
I did some research on this, found a couple of interesting articles (first and second), and then got something working using pipeline
(require '[clojure.core.async :as async :refer [chan <!! pipeline put!]])
(def c1 (chan))
(def c2 (chan))
(pipeline 4 c2 (filter even?) c1)
(put! c1 1)
(put! c1 2)
(<!! c2)
;;=> 2
The second article I linked makes this a bit cleaner with some helper functions around the pipeline function:
(defn ncpus []
(.availableProcessors (Runtime/getRuntime)))
(defn parallelism []
(+ (ncpus) 1))
(defn add-transducer
[in xf]
(let [out (chan (buffer 16))]
(pipeline (parallelism) out xf in)
out))
Then you can simply tie channels together with
(def c1 (chan))
(def c2 (add-transducer c1 (filter even?))
To complete the answer, as you found yourself you can use pipe in a similar fashion:
(defn pipe-trans
[ci xf]
(let [co (chan 1 xf)]
(pipe ci co)
co))
(def c1 (chan))
(def c2 (pipe-trans c1 (filter even?)))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With