I'm trying to make a conditional transducer in Clojure as follows:
(defn if-xf
"Takes a predicate and two transducers.
Returns a new transducer that routes the input to one of the transducers
depending on the result of the predicate."
[pred a b]
(fn [rf]
(let [arf (a rf)
brf (b rf)]
(fn
([] (rf))
([result]
(rf result))
([result input]
(if (pred input)
(arf result input)
(brf result input)))))))
It is pretty useful in that it lets you do stuff like this:
;; multiply odd numbers by 100, square the evens.
(= [0 100 4 300 16 500 36 700 64 900]
(sequence
(if-xf odd? (map #(* % 100)) (map (fn [x] (* x x))))
(range 10)))
However, this conditional transducer does not work very well with transducers that perform cleanup in their 1-arity branch:
;; negs are multiplied by 100, non-negs are partitioned by 2
;; BUT! where did 6 go?
;; expected: [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5] [6]]
;;
(= [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5]]
(sequence
(if-xf neg? (map #(* % 100)) (partition-all 2))
(range -6 7)))
Is it possible to tweak the definition of if-xf
to handle the case of transducers with cleanup?
I'm trying this, but with weird behavior:
(defn if-xf
"Takes a predicate and two transducers.
Returns a new transducer that routes the input to one of the transducers
depending on the result of the predicate."
[pred a b]
(fn [rf]
(let [arf (a rf)
brf (b rf)]
(fn
([] (rf))
([result]
(arf result) ;; new!
(brf result) ;; new!
(rf result))
([result input]
(if (pred input)
(arf result input)
(brf result input)))))))
Specifically, the flushing happens at the end:
;; the [0] at the end should appear just before the 100.
(= [[-6 -5] [-4 -3] [-2 -1] 100 200 300 400 500 600 [0]]
(sequence
(if-xf pos? (map #(* % 100)) (partition-all 2))
(range -6 7)))
Is there a way to make this branching/conditional transducer without storing the entire input sequence in local state within this transducer (i.e. doing all the processing in the 1-arity branch upon cleanup)?
The idea is to complete every time the transducer switches over. IMO this is the only way to do it without buffering:
(defn if-xf
"Takes a predicate and two transducers.
Returns a new transducer that routes the input to one of the transducers
depending on the result of the predicate."
[pred a b]
(fn [rf]
(let [arf (volatile! (a rf))
brf (volatile! (b rf))
a? (volatile! nil)]
(fn
([] (rf))
([result]
(let [crf (if @a? @arf @brf)]
(-> result crf rf)))
([result input]
(let [p? (pred input)
[xrf crf] (if p? [@arf @brf] [@brf @arf])
switched? (some-> @a? (not= p?))]
(if switched?
(-> result crf (xrf input))
(xrf result input))
(vreset! a? p?)))))))
(sequence (if-xf pos? (map #(* % 100)) (partition-all 2)) [0 1 0 1 0 0 0 1])
; => ([0] 100 [0] 100 [0 0] [0] 100)
I think your question is ill-defined. What exactly do you want to happen when the transducers have state? For example, what do you expect this do:
(sequence
(if-xf even? (partition-all 3) (partition-all 2))
(range 14))
Furthermore, sometimes reducing functions have work to do at the beginning and the end and can't be restarted arbitrarily. For example, here is a reducer that computes the mean:
(defn mean
([] {:count 0, :sum 0})
([result] (double (/ (:sum result) (:count result))))
([result x]
(update-in
(update-in result [:count] inc)
[:sum] (partial + x))))
(transduce identity mean [10 20 40 40]) ;27.5
Now let's take the average, where anything below 20 counts for 20, but everything else is decreased by 1:
(transduce
(if-xf
(fn [x] (< x 20))
(map (constantly 20))
(map dec))
mean [10 20 40 40]) ;29.25
My answer is the following: I think your original solution is best. It works well using map
, which is how you stated the usefulness of the conditional transducer in the first place.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With