Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is returning results from MailboxProcessor via Rx a good idea?

I am a little curious about the code example below and what people think. The idea was to read from a NetworkStream (~20 msg/s) and instead of working in the main, pass things to MainboxProcessor to handle and get things back for bindings when done.

The usual way is to use PostAndReply, but I want to bind to ListView or other control in C#. Must do magic with LastN items and filtering anyway. Plus, Rx has some error handling.

The example below observes numbers from 2..10 and returns "hello X". On 8 it stops like it was EOF. Made it to ToEnumerable because other thread finishes before otherwise, but it works with Subscribe as well.

What bothers me:

  1. passing Subject(obj) around in recursion. I don't see any problems having around 3-4 of those. Good idea?
  2. Lifetime of Subject.

open System
open System.Threading
open System.Reactive.Subjects
open System.Reactive.Linq  // NuGet, take System.Reactive.Core also.
open System.Reactive.Concurrency

type SerializedLogger() = 

    let _letters = new Subject<string>()
    // create the mailbox processor
    let agent = MailboxProcessor.Start(fun inbox -> 

        // the message processing function
        let rec messageLoop (letters:Subject<string>) = async{

            // read a message
            let! msg = inbox.Receive()

            printfn "mailbox: %d in Thread: %d" msg Thread.CurrentThread.ManagedThreadId
            do! Async.Sleep 100
            // write it to the log    
            match msg with
            | 8 -> letters.OnCompleted() // like EOF.
            | x -> letters.OnNext(sprintf "hello %d" x)

            // loop to top
            return! messageLoop letters
            }

        // start the loop
        messageLoop _letters
        )

    // public interface
    member this.Log msg = agent.Post msg
    member this.Getletters() = _letters.AsObservable()

/// Print line with prefix 1.
let myPrint1 x = printfn "onNext - %s,  Thread: %d" x  Thread.CurrentThread.ManagedThreadId

// Actions
let onNext = new Action<string>(myPrint1)
let onCompleted = new Action(fun _ -> printfn "Complete")

[<EntryPoint>]
let main argv = 
    async{
    printfn "Main is on: %d" Thread.CurrentThread.ManagedThreadId

    // test
    let logger = SerializedLogger()
    logger.Log 1 // ignored?

    let xObs = logger
                .Getletters() //.Where( fun x -> x <> "hello 5")
                .SubscribeOn(Scheduler.CurrentThread)
                .ObserveOn(Scheduler.CurrentThread)
                .ToEnumerable() // this
                //.Subscribe(onNext, onCompleted) // or with Dispose()

    [2..10] |> Seq.iter (logger.Log) 

    xObs |> Seq.iter myPrint1

    while true 
        do 
        printfn "waiting"
        System.Threading.Thread.Sleep(1000)

    return 0
    } |> Async.RunSynchronously // return an integer exit code
like image 528
Dimka Avatar asked Aug 18 '16 22:08

Dimka


1 Answers

I have done similar things, but using the plain F# Event type rather than Subject. It basically lets you create IObservable and trigger its subscribes - much like your use of more complex Subject. The event-based version would be:

type SerializedLogger() = 
   let letterProduced = new Event<string>()
   let lettersEnded = new Event<unit>()
   let agent = MailboxProcessor.Start(fun inbox -> 
     let rec messageLoop (letters:Subject<string>) = async {
       // Some code omitted
       match msg with
       | 8 -> lettersEnded.Trigger()
       | x -> letterProduced.Trigger(sprintf "hello %d" x)
       // ...

member this.Log msg = agent.Post msg
member this.LetterProduced = letterProduced.Publish
member this.LettersEnded = lettersEnded.Publish

The important differences are:

  • Event cannot trigger OnCompleted, so I instead exposed two separate events. This is quite unfortunate! Given that Subject is very similar to events in all other aspects, this might be a good reason for using subject instead of plain event.

  • The nice aspect of using Event is that it is a standard F# type, so you do not need any external dependencies in the agent.

  • I noticed your comment noting that the first call to Log was ignored. That's because you subscribe to the event handler only after this call happens. I think you could use ReplaySubject variation on the Subject idea here - it replays all events when you subscribe to it, so the one that happened earlier would not be lost (but there is a cost to caching).

In summary, I think using Subject is probably a good idea - it is essentially the same pattern as using Event (which I think is quite standard way of exposing notifications from agents), but it lets you trigger OnCompleted. I would probably not use ReplaySubject, because of the caching cost - you just have to make sure to subscribe before triggering any events.

like image 164
Tomas Petricek Avatar answered Nov 15 '22 07:11

Tomas Petricek