I'm writing a socket server which should handle every message received from every client connected to it.
All the messages are enqueued in an observable, so it can be subscribed and observed outside.
To ensure that all client socket messages are put on the same observable I've used the following snippet:
private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }
public SocketServer()
{
// ...
// Initialization of server socket
// ...
_msgSubject = new Subject<IObservable<Msg>>();
Messages = _msgSubject.Merge();
}
private void OnNewConnection(ISocket socket)
{
var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);
_msgSubject.OnNext(evtObservable);
}
Now, I've inspected the (de)allocation of the memory and the problem is that even if the socket is properly closed, there is still references of the relative observable added into the subject; also, the deregistration of the event is never called.
So, here's the question: is there a way to force the removal of the "socket observable" from the subject?
Maybe something to trigger the OnComplete of the socket observable should do the job, but how?
Currently you don't expose anyway to signify the termination (OnComplete
or OnError
) of the inner Observable sequences.
You allow the unregistering of the event handler if the consumer happens to dispose of the subscription, but this is consumer driven.
Do you have any event that is raised on the ISocket
interface that represents the closing of the socket?
If so you can add an Observable.FromEvent
wrapper around that, then use that observable sequence to terminate the OnMessage
sequence.
var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnClose+= h,
h => socket.OnClose -= h);
var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
action => action.Invoke,
h => socket.OnMessage += h,
h => socket.OnMessage -= h);
msgObservable.TakeUntil(closeObservable)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With