Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to Accumulate Observables

I should define a function, which will return IObservable<'u>

accumulate : ('t -> 'a -> 't * 'u option) -> 't -> IObservable<'a> -> IObservable<'u>

So that my function f t obs' accumulates observable events of obs into an accumulator of type 't and emits an observable event u when 'snd (f acc a)' evaluates to 'Some u' for an observed event 'a'.

So far I have implemented the function below:

let accumulate (f:'t -> 'a -> 't * 'u option) t obs = 
 Observable.scan (fun _ x -> snd (f t x)) None obs

I really don't understand how this observable scan thing works, my function returns IObservable<'u option> in this case. How can I fix that? Am I on the right track?

like image 977
wolf Avatar asked Mar 04 '23 20:03

wolf


2 Answers

The function fun _ x -> snd (f t x) is incomplete. The clue is that the first parameter _ is ignored and the first part of the resulting tuple is thrown away by the call to snd.

There is no accumulation because f t x always calls with the same value t that was passed originally to accumulate. That original t is supposed to be the initial value and should be passed to scan as part of its second parameter.

The first part of the tuple produced by f:'t -> 'a -> 't * 'u optionis the accumulated value. So that is that part that needs to be returned to scan so that it gets passed to f again and accumulated over and over.

In your problem the requirement is to accumulate and also pass an event when the second part of the tuple is Some 'u. So the question is how to do both: accumulate 't and filter 'u?

The answer is by combining the accumulated value with the Some 'u which is what f does. So you need to keep the tuple as the scan state and then afterwards keep only the second part using choose and snd.

This is what you are looking for:

let accumulate (f:'t -> 'a -> 't * 'u option) t obs =
    obs
    |> Observable.scan (fun (acc, _) x -> f acc x) (t, None)
    |> Observable.choose snd

Understanding scan

scan is a function that carries a changing state by passing it to a function together with a series of values. In particular it can be used to accumulate values, for instance an int running total:

let keepTotal obs =
    obs
    |> Observable.scan (fun total v -> total + v) 0

This is equivalent to doing this in imperative code with a mutable total :

let mutable total = 0

let keepTotal2 obs =
    obs
    |> Observable.map (fun v -> 
        total <- total + v
        total
    )

notice how the 2 versions have the same elements:

  • initial value: 0
  • accumulator function: total + v

Of course the second version, even though it uses map, is bad functional code because it uses an external mutable variable which is a big NO NO.

Your original problem could have been solved the same way:

let accumulate2 (f:'t -> 'a -> 't * 'u option) t obs =
    let mutable acc = t
    obs
    |> Observable.choose (fun x ->
        let acc2, uOp = f acc x
        acc <- acc2
        uOp
    )

Even though this one uses a mutable variable which is ugly in functional programming (and unnecessary) it is functionally ok, because the variable acc is internal and no code outside accumulate2 can see it. Still ugly though.

like image 71
AMieres Avatar answered Mar 12 '23 16:03

AMieres


You can chain an Observable.choose after your Observable.scan to get the correct type signature

let accumulate (f:'t -> 'a -> 't * 'u option) t obs =
    obs
    |> Observable.scan (fun _ x -> snd (f t x)) None
    |> Observable.choose id
like image 43
nilekirk Avatar answered Mar 12 '23 15:03

nilekirk