In Rx, I can create a subject (something) like this:
let s = new Subject<int>()
let subscription =
s.Subscribe (fun x ->
Console.WriteLine x
)
s.OnNext 123
The subject is both observable and an observer. This is useful when turning callbacks into streams.
What is the equivalent using F#'s built-in Observable module?
There's an example implementation on F# Snippets:
Observable.Subject
The Subject type implements both IObserver and IObservable. It is functionally equivalent to the type of the same name in the Reactive Extensions (Rx) library.
module Observable
open System
open System.Collections.Generic
type Subject<'T> () =
let sync = obj()
let mutable stopped = false
let observers = List<IObserver<'T>>()
let iter f = observers |> Seq.iter f
let onCompleted () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnCompleted())
let onError ex () =
if not stopped then
stopped <- true
iter (fun observer -> observer.OnError(ex))
let next value () =
if not stopped then
iter (fun observer -> observer.OnNext(value))
let remove observer () =
observers.Remove observer |> ignore
member x.Next value = lock sync <| next value
member x.Error ex = lock sync <| onError ex
member x.Completed () = lock sync <| onCompleted
interface IObserver<'T> with
member x.OnCompleted() = x.Completed()
member x.OnError ex = x.Error ex
member x.OnNext value = x.Next value
interface IObservable<'T> with
member this.Subscribe(observer:IObserver<'T>) =
observers.Add observer
{ new IDisposable with
member this.Dispose() =
lock sync <| remove observer
}
do let s = Subject()
use d = s.Subscribe(fun x -> sprintf "%d" x |> Console.WriteLine)
[1..12] |> Seq.iter s.Next
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With