I've recently been reading about IObservable. So far, i've looked at various SO questions, and watched a video on what they can do. The whole "push" mechanism I'm thinking is brilliant, but I'm still trying to figure out what exactly everything does. From my readings, I guess in a way an IObservable
is something that can be 'watched', and IObservers
are the 'watchers'.
So now I'm off to try and implement this in my application. There are a few things I would like to nut out before I get started. I've seen that IObservable is the opposite of IEnumerable, however, I can't really see any places in my particular instance that I can incorporate into my app.
Currently, I make heavy use of events, so much that I can see the 'plumbing' is starting to get unmanageable. I would think, that IObservable can help me out here.
Consider the following design, which is my wrapper around my I/O within my application (FYI, I typically have to deal with strings):
I have a base interface called IDataIO
:
public interface IDataIO
{
event OnDataReceived;
event OnTimeout:
event OnTransmit;
}
Now, I currently have three classes that implement this interface, each of these classes in some way utilize Async method calls, introducing some type of multithreaded processing:
public class SerialIO : IDataIO;
public class UdpIO : IDataIO;
public class TcpIO : IDataIO;
There is a single instance of each of these classes wrapped up into my final class, called IO (which also implements IDataIO - adhering to my strategy pattern):
public class IO : IDataIO
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
}
I have utilized the strategy pattern to encapsulate these three classes, so that when changing between the different IDataIO
instances at runtime makes it 'invisible' to the end user. As you could imagine, this has led to quite a bit of 'event plumbing' in the background.
So, how can I utilize 'push' notification here in my case? Instead of subscribing to events (DataReceived etc) I would like to simply push the data to anyone that's interested. I'm a bit unsure of where to get started. I'm still trying to toy with the ideas/generic classes of Subject
, and the various incarnations of this (ReplaySubject/AsynSubject/BehaviourSubject). Could someone please enlighten me on this one (maybe with reference to my design)? Or is this simply not an ideal fit for IObservable
?
PS. Feel free to correct any of my 'misunderstandings' :)
Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.
the Promise is always asynchronous, while the Observable can be either asynchronous or synchronous, the Promise can provide a single value, whereas the Observable is a stream of values (from 0 to multiple values), you can apply RxJS operators to the Observable to get a new tailored stream.
Angular makes use of observables as an interface to handle a variety of common asynchronous operations. For example: The HTTP module uses observables to handle AJAX requests and responses. The Router and Forms modules use observables to listen for and respond to user-input events.
While an Observable can do everything a Promise can, the reverse is not true. For example, an Observable can emit multiple values over time. A Promise only resolves once.
Observables are great for representing streams of data, so your DataReceived
event would model nicely to the observable pattern, something like IObservable<byte>
or IObservable<byte[]>
. You also get the added benefit of OnError
and OnComplete
which are handy.
In terms of implementing it, it's hard to say for your exact scenario but we often use Subject<T>
as the underlying source and call OnNext
to push data. Maybe something like
// Using a subject is probably the easiest way to push data to an Observable
// It wraps up both IObservable and IObserver so you almost never use IObserver directly
private readonly Subject<byte> subject = new Subject<byte>();
private void OnPort_DataReceived(object sender, EventArgs e)
{
// This pushes the data to the IObserver, which is probably just a wrapper
// around your subscribe delegate is you're using the Rx extensions
this.subject.OnNext(port.Data); // pseudo code
}
You can then expose the subject through a property:
public IObservable<byte> DataObservable
{
get { return this.subject; } // Or this.subject.AsObservable();
}
You can replace your DataReceived
event on IDataIO
with an IObservable<T>
and have each strategy class handle their data in whichever manner they need and push off to the Subject<T>
.
On the other side, whoever subscribes to the Observable is then able to either handle it like an event (just by using an Action<byte[]>
) or you can perform some really useful work on the stream with Select
, Where
, Buffer
, etc.
private IDataIO dataIo = new ...
private void SubscribeToData()
{
dataIo.DataObservable.Buffer(16).Subscribe(On16Bytes);
}
private void On16Bytes(IList<byte> bytes)
{
// do stuff
}
ReplaySubject
/ConnectableObservable
s are great when you know your subscriber is going to be arriving late to the party but still needs to catch up on all of the events. The source caches everything it's pushed and replays everything for each subscriber. Only you can say whether that's the behaviour you actually need (but be careful because it will cache everything which is going increase your memory usage obviously).
When I was learning about Rx I found http://leecampbell.blogspot.co.uk/ blog series on Rx to be very informative to understand the theory (the posts are a little dated now and the APIs have changed so watch out for that)
This is definitely an ideal case for observables. The IO
class will probably see the most improvement. To start with, lets change the interface to use observables and see how simple the combining class becomes.
public interface IDataIO
{
//you will have to fill in the types here. Either the event args
//the events provide now or byte[] or something relevant would be good.
IObservable<???> DataReceived;
IObservable<???> Timeout;
IObservable<???> Transmit;
}
public class IO : IDataIO
{
public SerialIO Serial;
public UdpIO Udp;
public TcpIO Tcp;
public IObservable<???> DataReceived
{
get
{
return Observable.Merge(Serial.DataReceived,
Udp.DataReceived,
Tcp.DataReceived);
}
}
//similarly for other two observables
}
SIDE NOTE: You may notice that I changed the interface member names. In .NET events are typically named <event name>
and the functions that raise them are called On<event name>
.
For the producing classes, you have a few options that depend on the actual sources. Suppose you are using the .NET SerialPort class in the SerialIO
and that DataReceived
returns an IObservable<byte[]>
. Since the SerialPort already has an event for data received, you can use that directly to make the observable you need.
public class SerialIO : IDataIO
{
private SerialPort _port;
public IObservable<byte[]> DataRecived
{
get
{
return Observable.FromEventPattern<SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>(
h => _port.DataReceived += h,
h => _port.DataReceived -= h)
.Where(ep => ep.EventArgs.EventType == SerialData.Chars)
.Select(ep =>
{
byte[] buffer = new byte[_port.BytesToRead];
_port.Read(buffer, 0, buffer.Length);
return buffer;
});
}
}
}
For cases where you don't have an existing event source, you may need to use a subject as RichK suggested. His answer covers that usage pattern quite well, so I won't duplicate that here.
You did not show how you use this interface, but depending on the use case, it may make more sense to have other functions on these classes return IObservable
s themselves and do away with these "events" entirely. With an event-based async pattern, you have to have events separate from the function you call to trigger the work, but with observables, you can return them from the function instead to make it more obvious what you are subscribing for. That approach also allows the observables returned from each call to send OnError
and OnCompleted
messages to signal the end of an operation. Based on your use of a combining class, I don't expect this to be useful in this particular case, but it is something to keep in mind.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With