I am a little curious about the code example below and what people think. The idea was to read from a NetworkStream (~20 msg/s) and instead of working in the main, pass things to MainboxProcessor to handle and get things back for bindings when done.
The usual way is to use PostAndReply, but I want to bind to ListView or other control in C#. Must do magic with LastN items and filtering anyway. Plus, Rx has some error handling.
The example below observes numbers from 2..10 and returns "hello X". On 8 it stops like it was EOF. Made it to ToEnumerable because other thread finishes before otherwise, but it works with Subscribe as well.
What bothers me:
open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency
type SerializedLogger() =
let _letters = new Subject<string>()
// create the mailbox processor
let agent = MailboxProcessor.Start(fun inbox ->
// the message processing function
let rec messageLoop (letters:Subject<string>) = async{
// read a message
let! msg = inbox.Receive()
printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
do! Async.Sleep 100
// write it to the log
match msg with
| 8 -> letters.OnCompleted() // like EOF.
| x -> letters.OnNext(sprintf "hello %d" x)
// loop to top
return! messageLoop letters
}
// start the loop
messageLoop _letters
)
// public interface
member this.Log msg = agent.Post msg
member this.Getletters() = _letters.AsObservable()
/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s, Thread: %d" x Thread.CurrentThread.ManagedThreadId
// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")
[<EntryPoint>]
let main argv =
async{
printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId
// test
let logger = SerializedLogger()
logger.Log 1 // ignored?
let xObs = logger
.Getletters() //.Where( fun x -> x <> "hello 5")
.SubscribeOn(Scheduler.CurrentThread)
.ObserveOn(Scheduler.CurrentThread)
.ToEnumerable() // this
//.Subscribe(onNext, onCompleted) // or with Dispose()
[2..10] |> Seq.iter (logger.Log)
xObs |> Seq.iter myPrint1
while true
do
printfn "waiting"
System.Threading.Thread.Sleep(1000)
return 0
} |> Async.RunSynchronously // return an integer exit code
I have done similar things, but using the plain F# Event
type rather than Subject
. It basically lets you create IObservable
and trigger its subscribes - much like your use of more complex Subject
. The event-based version would be:
type SerializedLogger() =
let letterProduced = new Event<string>()
let lettersEnded = new Event<unit>()
let agent = MailboxProcessor.Start(fun inbox ->
let rec messageLoop (letters:Subject<string>) = async {
// Some code omitted
match msg with
| 8 -> lettersEnded.Trigger()
| x -> letterProduced.Trigger(sprintf "hello %d" x)
// ...
member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish
The important differences are:
Event
cannot trigger OnCompleted
, so I instead exposed two separate events. This is quite unfortunate! Given that Subject
is very similar to events in all other aspects, this might be a good reason for using subject instead of plain event.
The nice aspect of using Event
is that it is a standard F# type, so you do not need any external dependencies in the agent.
I noticed your comment noting that the first call to Log
was ignored. That's because you subscribe to the event handler only after this call happens. I think you could use ReplaySubject variation on the Subject idea here - it replays all events when you subscribe to it, so the one that happened earlier would not be lost (but there is a cost to caching).
In summary, I think using Subject
is probably a good idea - it is essentially the same pattern as using Event
(which I think is quite standard way of exposing notifications from agents), but it lets you trigger OnCompleted
. I would probably not use ReplaySubject
, because of the caching cost - you just have to make sure to subscribe before triggering any events.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With