Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Observable Network IO Parsing

I am trying to use Rx to read from a TCPClient receive stream and parse the data into an IObservable of string, delimited by newline "\r\n" The following is how I'm receiving from the socket stream...

var messages = new Subject<string>();

var functionReceiveSocketData =
            Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
            (client.Client.BeginReceive, client.Client.EndReceive);

Func<byte[], int, byte[]> copy = (bs, n) =>
        {
            var rs = new byte[buffer.Length];
            bs.CopyTo(rs, 0);
            return rs;
        };

Observable
    .Defer(() =>
            {
                var buffer = new byte[50];
                return
                    from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
                select copy(buffer, n);
            }).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));

Here is what I came up with to parse the string. This currently does not work...

obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

The message subject receives the message in chunks so I'm trying to concat them and test whether the concatenated string contains newline, thus signaling the buffer to close and output the buffered chunks. Not sure why it isn't working. Seems that I only get the first chunk out of obsStrings.

So I am looking for two things. I'd like to simplify reading of the io stream and eliminate the use of the messages subject. Secondly, I'd like to get my string parsing working. I have been hacking on this for a bit and cannot come up with a working solution. I am a beginner with Rx.

EDIT: Here is the finished product after the problem was solved....

var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
            .SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
            .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
            .Where(x => x.EndsWith("\r\n"))
            .Select(buffered => String.Join("", buffered))
            .Select(a => a.Replace("\n", ""));

"ReceiveUntilCompleted" is an extension from the RXX project.

like image 796
TK3 Avatar asked May 01 '12 19:05

TK3


2 Answers

messages
    .Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
    .Where(x => x.EndsWith("\r\n"))
like image 127
ronag Avatar answered Nov 13 '22 04:11

ronag


Instead of Subscribe and using the Subject, can you try just Select:

.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));

Now assuming this has all gone into a new observable called messages, your next problem is that in this line

var obsStrings = messages.Buffer<string,string>(() =>  
                messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
            );

You're using both Buffer and Scan, and are trying to do the same thing in both! Note that Buffer needs a closing selector.

What you really want is:

var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n")))
                         .Select(buffered => String.Join(buffered));

Which gives Buffered an observable regarding when to close the window (when it contains \r\n) and gives Select the buffered amount to concatenate. This results in a new observable of your split strings.

One issue is that you can still have the new line in the middle of a chunk and this will cause problems. One simple idea is to observe on the characters rather than full string chunks, such as:

obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());

Then you can do messages.Where(c => c != '\r') to skip \r and change the buffer to:

var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n')))
                         .Select(buffered => String.Join("", buffered));
like image 41
yamen Avatar answered Nov 13 '22 06:11

yamen