Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Disposing combined observables from events

I'm writing a socket server which should handle every message received from every client connected to it.

All the messages are enqueued in an observable, so it can be subscribed and observed outside.

To ensure that all client socket messages are put on the same observable I've used the following snippet:

private Subject<IObservable<Msg>> _msgSubject;
public IObservable<Msg> Messages { get; private set; }

public SocketServer()
{
    // ...
    // Initialization of server socket
    // ...
    _msgSubject = new Subject<IObservable<Msg>>();
    Messages = _msgSubject.Merge();
}

private void OnNewConnection(ISocket socket)
{
    var evtObservable = Observable.FromEvent<Action<byte[]>, byte[]>(action => action.Invoke, h => socket.OnMessage += h, h => socket.OnMessage -= h);

    _msgSubject.OnNext(evtObservable);
}

Now, I've inspected the (de)allocation of the memory and the problem is that even if the socket is properly closed, there is still references of the relative observable added into the subject; also, the deregistration of the event is never called.

So, here's the question: is there a way to force the removal of the "socket observable" from the subject?

Maybe something to trigger the OnComplete of the socket observable should do the job, but how?

like image 388
Atropo Avatar asked Oct 18 '22 11:10

Atropo


1 Answers

Currently you don't expose anyway to signify the termination (OnComplete or OnError) of the inner Observable sequences. You allow the unregistering of the event handler if the consumer happens to dispose of the subscription, but this is consumer driven.

Do you have any event that is raised on the ISocket interface that represents the closing of the socket? If so you can add an Observable.FromEvent wrapper around that, then use that observable sequence to terminate the OnMessage sequence.

var closeObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnClose+= h, 
    h => socket.OnClose -= h);

var msgObservable = Observable.FromEvent<Action<byte[]>, byte[]>(
    action => action.Invoke, 
    h => socket.OnMessage += h, 
    h => socket.OnMessage -= h);

msgObservable.TakeUntil(closeObservable)
like image 87
Lee Campbell Avatar answered Oct 21 '22 05:10

Lee Campbell