I just noticed that the pipeline
family returns a channel
which seemingly operates completely independently of the purpose of the pipeline and it's related channels.
Notice in the following example you can >! / <!
from pipes
and a> / b>
separately, and they're unrelated.
As far as I understand, pipeline
s should be a no-op, and return nil
while setting up the sideffecting transduc
tion from a>
to b>
.
So, what am I missing, and why does pipeline
return a channel
?
(def a> (chan))
(def b> (chan))
(def pipes (pipeline-blocking 4
b>
(map clojure.string/upper-case)
a>))
(go (>! pipes "hello world"))
(go (println "Pipes: " (<! pipes)))
(go (>! a> "apples are gooood"))
(go (println "B: " (<! b>)))
You get back a channel which is closed when there are no more elements to copy. That is, after a>
is closed and all elements from it have been made upper-case and placed onto b>
. You can <!
from the resulting channel to find out when the pipelining operation is done, if you care, or you can just throw away the channel. You probably shouldn't write to it.
This is a common pattern for a lot of async operations, and indeed often happens implicitly: every go
block returns a channel which has the block's return value written to it when the block completes, and many async operations use a go
block as their return value, so you automatically get this "job done" channel as a result.
To expound on @amalloy's answer, in the following example, a>
and b>
have a true
put onto them when they are able to complete. Since chan>
is unbuffered, they can't complete until another process pulls off of them, ie the println
at the end.
If chan>
were buffered, a>
and b>
can >!
immediately, so the print immediately.
(def chan> (chan 4))
(def a> (go (>! chan> "Apple")))
(go (println "from a>: " (<! a>)))
(def b> (go (>! chan> "Ball")))
(go (println "from b>: " (<! b>)))
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Apple
;; => from a>: true
(go (println "from chan>: "(<! chan>)))
;; => from chan>: Ball
;; => from b>: true
This is the same idea behind pipeline
s.
;; Pipeline-specific
(def a> (chan))
(def b> (chan))
(def p> (pipeline 4 b> (map clojure.string/upper-case) a>))
;; this won't happen until `a>` `close!`s
(go (println "pipeline is done: " (<! p>)))
;; execute the following 2 lines ad lib
(go (>! a> "hi there"))
(go (println "from b>: " (<! b>)))
(comment
(close! a>) ; triggers the "pipeline is done"
(close! b>)) ; doesn't trigger it, but `b>` now only returns nil when taking
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With