I'm using the IObserver
/IObservable
interfaces in a project of mine.
The CommandReader
is an IObservable
which continuously reads data from a stream and then passes it to its CommandHandler
IObserver
(s).
When the underlying stream is closed (e.g. connection dies) then I will notify the observers with the IObserver.OnError(exception)
method
What should the observer do when it receives this notification? Should it release itself from the observable? Or is that the parent class' responsibility?
For starters you generally should not implement your own IObservable
and IObserver
objects. Your question kind of hints at why. It is very difficult to get the underlying behaviour correct.
Now each call to IObservable.Subscribe
returns an IDisposable
. This is used if the caller of Subscribe
wishes to unsubscribe from an observable before an OnCompleted
or OnError
is called. However, if an OnCompleted
or OnError
is called then the IDisposable
is automatically disposed of. So effectively Rx automatically cleans up after itself when an observable collection complete.
Each individual observer does not need to manage it's own subscription lifetime. The observer only needs to respond to the OnCompleted
/OnError
message.
In your code I would suggest that you think about changing your code slightly. I would expect a CommandReaderPublisher
class with a Subscribe
method might be more appropriate than a CommandReader
class. Once an Rx stream completes it cannot continue to be used.
Also I wonder if calling OnCompleted
would be better than OnError(exception)
when the underlying stream closes. If an error occurs then that's fine, but if it closes then ONCompleted
might be better.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With