Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Modifying RX Computational Expression Builder to hold previous values

I'm using a slightly modified version of the RX builder presented here:

http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html

Rather than taking IObservable<'T> directly my computational expression has a type of:

type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)

let extract (MyType t) = t

Combinators then take on the form:

let where (f: 'b -> bool) (m:MyType<_,'b>) = MyType(fun input -> (extract m input).Where(f))

Within the expression itself, I often need to reference back to previous values that have been fed into the stream. In order to do so, I've defined a MyType which maintains a rolling immutable list of the n most recent values.

let history n = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new History<_>(n)
            o.OnNext(HistoryReadOnly(buffer))
            input.Subscribe(buffer.Push, o.OnError, o.OnCompleted)
        )
    )

With this, I can now do something like:

let f = obs {
    let! history = history 20
    // Run some other types, and possibly do something with history
}

I am finding that I am using this history quite frequently, ideally I would want to have this embedded directly into IObservable<'a>. Obviously I can't do that. So my question is, what is a reasonable way to introduce this concept of history that I have here. Should I be extending IObservable<'T> (not sure how to do that), wrapping the IObservable<'T>?

I appreciate any suggestions.

Edit: Added full example code.

open System
open System.Collections.Generic
open System.Reactive.Subjects
open System.Reactive.Linq

// Container function
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t

// Mini Builder
let internal mbind (myTypeB:MyType<'a,'b>) (f:'b -> MyType<'a,'c>) = 
    MyType(fun input ->
        let obsB = extract myTypeB input
        let myTypeC= fun resB -> extract (f resB) input
        obsB.SelectMany(myTypeC)
    )

type MyTypeBuilder() = 
    member x.Bind (m,f) = mbind m f
    member x.Combine (a,b) = MyType(fun input -> (extract a input).Concat(extract b input))
    member x.Yield (r) = MyType(fun input -> Observable.Return(r))
    member x.YieldFrom (m:MyType<_,_>) = m
    member x.Zero() = MyType(fun input -> Observable.Empty())
    member x.Delay(f:unit -> MyType<'a,'b>) = f() 

let mtypeBuilder = new MyTypeBuilder()

// Combinators
let simplehistory = 
    MyType(fun input ->
        Observable.Create(fun (o:IObserver<_>) ->
            let buffer = new List<_>()
            o.OnNext(buffer)
            input.Subscribe(buffer.Add, o.OnError, o.OnCompleted)
        )
    )

let where (f: 'b -> bool) m = MyType(fun input -> (extract m input).Where(f))
let take (n:int) m = MyType(fun input -> (extract m input).Take(n))
let buffer m = MyType(fun input -> (extract m input).Buffer(1))
let stream = MyType(id)

// Example
let myTypeResult (t:MyType<'a,'b>) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single()

let dat = [|1 .. 20|]

let example = mtypeBuilder {
    let! history = simplehistory
    let! someEven = stream |> where(fun v -> v % 2 = 0) // Foreach Even
    let! firstValAfterPrevMatch = stream |> take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match
    let! odd = stream |> where(fun v -> v % 2 = 1) |> take 2 // Take 2 odds that follow it
    yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd
}

let result = myTypeResult example dat

val result : (int * int * int * int * int) [] =
  [|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9);
    (9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13);
    (13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15);
    (17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19);
    (19, 1, 16, 17, 19)|]
like image 988
Dave Avatar asked Aug 06 '13 15:08

Dave


3 Answers

Using the standard Rx workflow builder, you can create a function history that handles your example use case:

let history (io:IObservable<_>) = 
  io.Scan(new List<_>(), (fun l t -> l.Add t; l)).Distinct()

let io = new Subject<int>()
let newio = rx { let! history = history io
                 let! even = io.Where(fun v -> v % 2 = 0)
                 let! next = io.Take 1
                 let! odd = io.Where(fun v -> v % 2 = 1).Take 2
                 yield (history.Last(), history.[0], even, next, odd) }

newio.Subscribe(printfn "%O") |> ignore

for i = 1 to 20 do
  io.OnNext i

Extending that to provide history length limits should be trivial. Was there a specific reason you need to define your own type/builder, or was doing that just a means to attain something like this?

Here's an example with combinators. You just have to define the observable outside the rx block. You could get history to work differently with immutable histories rather than a persistent list, so whatever fits your needs.

let history (io:IObservable<_>) = 
  io.Scan(new List<_>(), (fun l t -> l.Add t; l))

let newest (hist:'a List) = hist.Last()
let extract (ioHist:'a List IObservable) = ioHist.Select newest
let take (i:int) (ioHist: 'a List IObservable) = ioHist.Take i
let where (f: 'a -> bool) (ioHist: 'a List IObservable) = ioHist.Where(fun hist -> f(newest hist))

let io = new Subject<int>()
let history1 = history io
let newio =
 rx { let! hist = history1.Distinct()
      let! even = extract (history1 |> where (fun v -> v % 2 = 0))
      let! next = extract (history1 |> take 1)
      let! odd = extract (history1 |> where (fun v -> v % 2 = 1) |> take 2)
      yield (hist.Last(), hist.[0], even, next, odd) }
like image 139
Dax Fohl Avatar answered Oct 20 '22 17:10

Dax Fohl


You can use Observable.Buffer to do this already. Sorry for the C# my F# hat is not thinking today.

IObservable<int> source = ...
IOBservable<IList<int>> buffered = source.Buffer(5,1)

will create you a stream of lists.

Or try to use buffer in LINQ which is more like F# query expressions

Console.WriteLine ("START");
var source = new List<int> () { 1, 2, 3, 4, 5 }.ToObservable ();

// LINQ C#'s Monad sugar
var r = 
        from buffer in source.Buffer (3, 1)
        from x in buffer
        from y in buffer
        select new { x,y};


r.Subscribe (o=>Console.WriteLine (o.x + " " + o.y));
Console.WriteLine ("END");

Note from in LINQ is exactly/almost the same as let! in an f# query expression. The result is below. Also note how I am using buffer later in the expression just like you could in an f# query expression.

START
1 1
1 2
1 3
2 1
2 2
2 3
3 1
3 2
3 3
2 2
2 3
2 4
3 2
3 3
3 4
4 2
4 3
4 4
3 3
3 4
3 5
4 3
4 4
4 5
5 3
5 4
5 5
4 4
4 5
5 4
5 5
5 5
END
like image 28
bradgonesurfing Avatar answered Oct 20 '22 18:10

bradgonesurfing


Sorry, my F# is extremely rusty, but perhaps you are looking for the Scan operator. It will push values to an accumulator as the source produces them and then you can use this accumulator to produce the next value for your projection.

Here (in C#, apologies) we take a sequence of [0..10] which produces these value 100ms apart, and we return a running Sum.

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
                       .Take(10);

source.Scan(
    new List<long>(),               //Seed value
    (acc, value)=>                  //Accumulator function
        {
            acc.Add(value);
            return acc;
        }
    )
    .Select(accumate=>accumate.Sum())

which produces the values [0,1,3,6,10,15,21,28,36,45] 100ms apart.

I would think with this tool you can manage your History of values (by adding them to a history/accumulator) and then using this History in the Select to project the value that is appropriate.

like image 23
Lee Campbell Avatar answered Oct 20 '22 18:10

Lee Campbell