Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does `core.async/pipeline` return a channel?

I just noticed that the pipeline family returns a channel which seemingly operates completely independently of the purpose of the pipeline and it's related channels.

Notice in the following example you can >! / <! from pipes and a> / b> separately, and they're unrelated.

As far as I understand, pipelines should be a no-op, and return nil while setting up the sideffecting transduction from a> to b>.

So, what am I missing, and why does pipeline return a channel?

(def a> (chan))
(def b> (chan))
(def pipes (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(go (>! pipes "hello world"))
(go (println "Pipes: " (<! pipes)))
(go (>! a> "apples are gooood"))
(go (println "B: " (<! b>)))
like image 457
Josh.F Avatar asked Nov 07 '16 20:11

Josh.F


2 Answers

You get back a channel which is closed when there are no more elements to copy. That is, after a> is closed and all elements from it have been made upper-case and placed onto b>. You can <! from the resulting channel to find out when the pipelining operation is done, if you care, or you can just throw away the channel. You probably shouldn't write to it.

This is a common pattern for a lot of async operations, and indeed often happens implicitly: every go block returns a channel which has the block's return value written to it when the block completes, and many async operations use a go block as their return value, so you automatically get this "job done" channel as a result.

like image 139
amalloy Avatar answered Sep 24 '22 19:09

amalloy


To expound on @amalloy's answer, in the following example, a> and b> have a true put onto them when they are able to complete. Since chan> is unbuffered, they can't complete until another process pulls off of them, ie the println at the end.

If chan> were buffered, a> and b> can >! immediately, so the print immediately.

(def chan> (chan 4))
(def a> (go (>! chan> "Apple")))
(go (println "from a>: " (<! a>)))
(def b> (go (>! chan> "Ball")))
(go (println "from b>: " (<! b>)))

(go (println "from chan>: "(<! chan>)))
;; => from chan>: Apple
;; => from a>: true
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Ball
;; => from b>: true

This is the same idea behind pipelines.


;; Pipeline-specific

(def a> (chan))
(def b> (chan))
(def p> (pipeline 4 b> (map clojure.string/upper-case) a>))

;; this won't happen until `a>` `close!`s
(go (println "pipeline is done: " (<! p>)))

;; execute the following 2 lines ad lib
(go (>! a> "hi there"))
(go (println "from b>: " (<! b>)))

(comment
  (close! a>) ; triggers the "pipeline is done"
  (close! b>)) ; doesn't trigger it, but `b>` now only returns nil when taking
like image 23
Josh.F Avatar answered Sep 21 '22 19:09

Josh.F