I am trying to use Rx to read from a TCPClient receive stream and parse the data into an IObservable of string, delimited by newline "\r\n" The following is how I'm receiving from the socket stream...
var messages = new Subject<string>();
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(client.Client.BeginReceive, client.Client.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[buffer.Length];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[50];
return
from n in functionReceiveSocketData(buffer, 0, buffer.Length, SocketFlags.None)
select copy(buffer, n);
}).Repeat().Subscribe(x => messages.OnNext(System.Text.Encoding.UTF8.GetString(x)));
Here is what I came up with to parse the string. This currently does not work...
obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
The message subject receives the message in chunks so I'm trying to concat them and test whether the concatenated string contains newline, thus signaling the buffer to close and output the buffered chunks. Not sure why it isn't working. Seems that I only get the first chunk out of obsStrings.
So I am looking for two things. I'd like to simplify reading of the io stream and eliminate the use of the messages subject. Secondly, I'd like to get my string parsing working. I have been hacking on this for a bit and cannot come up with a working solution. I am a beginner with Rx.
EDIT: Here is the finished product after the problem was solved....
var receivedStrings = socket.ReceiveUntilCompleted(SocketFlags.None)
.SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray())
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
.Select(buffered => String.Join("", buffered))
.Select(a => a.Replace("\n", ""));
"ReceiveUntilCompleted" is an extension from the RXX project.
messages
.Scan(String.Empty, (a, b) => (a.EndsWith("\r\n") ? "" : a) + b)
.Where(x => x.EndsWith("\r\n"))
Instead of Subscribe
and using the Subject
, can you try just Select
:
.Repeat().Select(x => System.Text.Encoding.UTF8.GetString(x));
Now assuming this has all gone into a new observable called messages
, your next problem is that in this line
var obsStrings = messages.Buffer<string,string>(() =>
messages.Scan((a, c) => a + c).SkipWhile(a => !a.Contains("\r\n"))
);
You're using both Buffer
and Scan
, and are trying to do the same thing in both! Note that Buffer
needs a closing selector.
What you really want is:
var obsStrings = messages.Buffer(() => messages.Where(x => x.Contains("\r\n")))
.Select(buffered => String.Join(buffered));
Which gives Buffered an observable regarding when to close the window (when it contains \r\n) and gives Select the buffered amount to concatenate. This results in a new observable of your split strings.
One issue is that you can still have the new line in the middle of a chunk and this will cause problems. One simple idea is to observe on the characters rather than full string chunks, such as:
obsStrings.Repeat().SelectMany(x => System.Text.Encoding.UTF8.GetString(x).ToCharArray().ToObservable());
Then you can do messages.Where(c => c != '\r')
to skip \r
and change the buffer to:
var obsStrings = messages.Buffer(() => messages.Where(x => x == '\n')))
.Select(buffered => String.Join("", buffered));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With