Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Reactive Extensions to parse a stream of characters from a serial port?

I need to parse a stream of serial data coming from a test instrument, and this seems to be an excellent application for Reactive Extensions.

The protocol is very simple...each "packet" is a single letter followed by numeric digits. The number of numeric digits is fixed for each packet type, but can vary between packet types. e.g.

...A1234B123456C12...

I am trying to break this up into an Observable of Strings, e.g.

"A1234" "B123456" "C12" ...

Thought this would be simple, but don't see the obvious way to approach this (I have some experience with LINQ, but am new to Rx).

Here's the code I have so far, that produces Observable of chars from the serial port's SerialDataReceived event.

        var serialData = Observable
                            .FromEventPattern<SerialDataReceivedEventArgs>(SerialPortMain, "DataReceived")
                            .SelectMany(_ => 
                            {
                                int dataLength = SerialPortMain.BytesToRead;
                                byte[] data = new byte[dataLength];
                                int nbrDataRead = SerialPortMain.Read(data, 0, dataLength);

                                if (nbrDataRead == 0)
                                    return  new char[0]; 

                                 var chars = Encoding.ASCII.GetChars(data);

                                 return chars; 
                            });

How can I transform serialData to Observable of String, where each string is a packet?

like image 995
Tom Bushell Avatar asked Jun 24 '14 19:06

Tom Bushell


1 Answers

Here's a slightly shorter method, in the same style as James' first solution, with a similar helper method:

public static bool IsCompletePacket(string s)
{
    switch (s[0])
    {
        case 'A':
            return s.Length == 5;
        case 'B':
            return s.Length == 6;
        case 'C':
            return s.Length == 7;
        default:
            throw new ArgumentException("Packet must begin with a letter");
    }
}

The code is then:

var packets = chars
    .Scan(string.Empty, (prev, cur) => char.IsLetter(cur) ? cur.ToString() : prev + cur)
    .Where(IsCompletePacket);

The Scan part builds up strings that terminate on a letter e.g.:

A
A1
A12
A123
...

The Where then just selects those that are the correct length. Essentially it just removes the Tuple from James' and uses the string length instead.

like image 186
Matthew Finlay Avatar answered Sep 17 '22 15:09

Matthew Finlay