I have an observable chain, the initial observable is from network, and will be fired every time message is ready to be read. The next handler then read the message and deserialize it. Now I have a fork of the observable, one is message handler and the other is logging the message.
The problem is that because I'm using observable I will actually try to read the message twice.
I understand that using Event instead of Observable will solve the issue, however I will then have a garbage collection issue that might cause sockets to not being collected.
One solution I thought of is to insert some kind of separator which end one chain of observables and creates a new one, does such a function already exist as part of fsharp or other library library.
Are there other solutions to the problem?
Edit:
Code example that doesn't work correctly
let messagesStream =
socket.observable |>
Observable.map (fun () -> socket.read ()) |>
Observable.map (fun m -> deserialize m)
messagesStream |> Observable.add (fun m -> printf m)
messagesStream |> Observable.add (fun m -> handle m)
The easiest way to add some logging is to use Observable.iter as follows:
let messagesStream =
socket.observable
|> Observable.map (fun () -> socket.read ())
|> Observable.map (fun m -> deserialize m)
|> Observable.iter (printfn "%A")
messagesStream |> Observable.add (fun m -> handle m)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With