The clojure reference contains the following comments about transducers, which seem like saying something important about the safety of writing and using transducers:
If you have a new context for applying transducers, there are a few general rules to be aware of:
If a step function returns a reduced value, the transducible process must not supply any more inputs to the step function. The reduced value must be unwrapped with deref before completion.
A completing process must call the completion operation on the final accumulated value exactly once.
- A transducing process must encapsulate references to the function returned by invoking a transducer - these may be stateful and unsafe for use across threads.
Can you explain, possibly with some examples, what each of these cases mean? also, what does "context" refer to in this context?
Thanks!
One example of this scenario is take-while
transducer:
(fn [rf]
(fn
([] (rf))
([result] (rf result))
([result input]
(if (pred input)
(rf result input)
(reduced result)))))
As you can see, it can return a reduced
value which means there is no point (and actually it would be an error) to provide more input to such step function - we know already there can be no more values produced.
For example while processing (1 1 3 5 6 8 7)
input collection with odd?
predicate once we reach value 6
there will be no more values returned by a step function created by take-while odd?
transducer.
This is a scenario where a transducer returns a stateful step function. A good example would be partition-by
transducer. For example when (partition-by odd?)
is used by the transducible process for processing (1 3 2 4 5 2)
it will produce ((1 3) (2 4) (5) (6 8))
.
(fn [rf]
(let [a (java.util.ArrayList.)
pv (volatile! ::none)]
(fn
([] (rf))
([result]
(let [result (if (.isEmpty a)
result
(let [v (vec (.toArray a))]
;;clear first!
(.clear a)
(unreduced (rf result v))))]
(rf result)))
([result input]
(let [pval @pv
val (f input)]
(vreset! pv val)
(if (or (identical? pval ::none)
(= val pval))
(do
(.add a input)
result)
(let [v (vec (.toArray a))]
(.clear a)
(let [ret (rf result v)]
(when-not (reduced? ret)
(.add a input))
ret))))))))
If you take a look at the implementation you will notice that the step function won't return it's accumulated values (stored in a
array list) until the predicate function will return a different result (e.g. after a sequence of odd numbers it will receive an even number, it will return a seq of accumulated odd numbers). The issue is if we reach the end of the source data - there will be no chance to observe a change in the predicate result value and the accumulated value wouldn't be returned. Thus the transducible process must call a completion operation of the step function (arity 1) so it can return its accumulated result (in our case (6 8)
).
When a transducible process is executed by passing a source data and transducer instance, it will first call the transducer function to produce a step function. The transducer is a function of the following shape:
(fn [xf]
(fn ([] ...)
([result] ...)
([result input] ...)))
Thus the transducible process will call this top level function (accepting xf
- a reducing function) to obtain the actual step function used for processing the data elements. The issue is that the transducible process must keep the reference to that step function and use the same instance for processing elements from a particular data source (e.g. the step function instance produced partition-by
transducer must be used for processing the whole input sequence as it keeps its internal state as you saw above). Using different instances for processing a single data source would yield incorrect results.
Similarly, a transducible process cannot reuse a step function instance for processing multiple data sources due to the same reason - the step function instance might be stateful and keep an internal state for processing a particular data source. That state would be corrupted when the step function would be used for processing another data source.
Also there is no guarantee if the step function implementation is thread safe or not.
"A new context for applying transducers" means implementing a new type of a transducible process. Clojure provides transducible processes working with collections (e.g. into
, sequence
). core.async library chan
function (one of its arities) accepts a transducer instance as an argument which produces an asynchronous transducible process producing values (that can be consumed from the channel) by applying a transducer to consumed values.
You could for example create a transducible process for handling data received on a socket, or your own implementation of observables.
They could use transducers for transforming the data as transducers are agnostic when it comes where the data comes from (a socket, a stream, collection, an event source etc.) - it is just a function called with individual elements.
They also don't care (and don't know) what should be done with the result they generate (e.g. should it be appended to a result sequence (for example conj
)? should it be sent over network? inserted to a database?) - it's abstracted by using a reducing function that is captured by the step function (rf
argument above).
So instead of creating a step function that just uses conj
or saves elements to db, we pass a function which has a specific implementation of that operation. And your transducible process defines what that operation is.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With