I'm just learning Rx and was trying to implement a "NMEA sentence reader" from a GPS device using SerialPort. The fact that it's GPS data is of lesser importance to the question, so let's just clarify that the NMEA format consists of lines, and a '$' sign represents the start of a new entry, so you get "sentences" that look similar to:
$[data for first line goes here]
$[data for second line goes here]
...
Hooking up the SerialPort's DataReceived event is pretty straightforward:
var port = new SerialPort("COM3", 4800);
var serialPortSource = Observable.FromEventPattern<
SerialDataReceivedEventHandler,
SerialDataReceivedEventArgs>
(
handler => port.DataReceived += handler,
handler => port.DataReceived -= handler
).Select(e => port.ReadExisting());
This gives an IObservable<string>
, but obviously the strings returned here are not necessarily aligned as NMEA sentences. For example, for the above sample data, one could get a sequence like this:
$[data for first line g
oes here]\r\n$[data for
second line goes here]
How to properly turn this into a sequence of actual sentences? In the IEnumerable
world, I would probably start from a sequence of char
s and write an extension method similar to this:
public static IEnumerable<string> ToNmeaSentence(
this IEnumerable<char> characters
)
{
var sb = new StringBuilder();
foreach (var ch in characters)
{
if (ch == '$' && sb.Length > 0)
{
yield return sb.ToString();
sb.Clear();
}
sb.Append(ch);
}
}
Now I'm wondering if there is an idiomatic way for this kind of operation in Rx?
It's exactly the same code as Enumerables. You use Subscribe
instead of fast enumeration, and you use observer.OnNext
instead of yield return
. Oh, and you have to use Observable.Create
, because C# doesn't have language support for Observers like it does for Enumerables (yet. And that's not a failing of Rx).
Enumerables and Observables are exactly the same thing. One pushes, the other pulls. Creating them has a slightly different syntax. That's all.
public static IObservable<string> ToNmeaSentence(
this IObservable<char> characters
)
{
return Observable.Create<string>(observer => {
var sb = new StringBuilder();
return characters.Subscribe(ch => {
if (ch == '$' && sb.Length > 0)
{
observer.OnNext(sb.ToString());
sb.Clear();
}
sb.Append(ch);
});
});
}
I don't usually program on this low of a level, but Observables don't complicate it any more than Enumerables do. When people first learn Enumerables, it's hard to understand. When people first learn Observables, it's hard to understand. Both of them do the same thing, but one pushes and one pulls. There's 1-1 relationship between the two aside from that one distinction.
You would be wrong to think Rx is any more complicated than Enumerables and LINQ to Objects. It just appears that way when you're still learning it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With