As part of our application (in production for about 4 months now) we have a stream of data coming from an external device that we convert to an IObservable
Up until now we've been using the following to generate it, and it's been working quite well.
IObservable<string> ObserveStringStream(Stream inputStream)
{
var streamReader = new StreamReader(inputStream);
return Observable
.Create<string>(observer => Scheduler.ThreadPool
.Schedule(() => ReadLoop(streamReader, observer)));
}
private void ReadLoop(StreamReader reader, IObserver<string> observer)
{
while (true)
{
try
{
var line = reader.ReadLine();
if (line != null)
{
observer.OnNext(line);
}
else
{
observer.OnCompleted();
break;
}
}
catch (Exception ex)
{
observer.OnError(ex);
break;
}
}
}
Last night I wondered if there was a way to use the yield return
syntax to achieve the same result and came up with this:
IObservable<string> ObserveStringStream(Stream inputStream)
{
var streamReader = new StreamReader(inputStream);
return ReadLoop(streamReader)
.ToObservable(Scheduler.ThreadPool);
}
private IEnumerable<string> ReadLoop(StreamReader reader)
{
while (true)
{
var line = reader.ReadLine();
if (line != null)
{
yield return line;
}
else
{
yield break;
}
}
}
It seems to work quite well and it's much cleaner, but I was wondering if there were any pros or cons for one way over the other, or if there was a better way entirely.
I think you've got a good idea there (turn Stream
into Enumerable<string>
then IObservable<string>
). However, the IEnumerable code can be cleaner:
IEnumerable<string> ReadLines(Stream stream)
{
using (StreamReader reader = new StreamReader(stream))
{
while (!reader.EndOfStream)
yield return reader.ReadLine();
}
}
And then for the IObservable:
IObservable<string> ObserveLines(Stream inputStream)
{
return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool);
}
This is shorter, more readable, and properly disposes of the streams. It's also lazy.
The ToObservable
extension takes care of catching the OnNext
events (new lines) as well as the OnCompleted
event (end of enumerable) and OnError
.
I don't have the code to hand, but here's how to do it async pre-async CTP.
[Note for skim-readers: no need to bother if you don't need to scale much]
Create an AsyncTextReader implementation, that is itself Observable. The ctor takes in a Stream, and performs a BeginRead(256bytes) on the stream, passing itself as the continuation, then returning.
When the continuation is entered, call EndRead, and add the returned bytes into a little buffer on the class. Repeat this till the buffer contains one or more end-of-line sequences (as per TextWriter). When this happens, send those bit(s) of the buffer out as a string via the via the Observable interface, and repeat.
When you've finished, signal OnComplete etc... (and dispose the stream). If you get an exception thrown from the EndReadByte in your continuation, catch it and pass it out the OnError interface.
calling code then looks like:
IObservable = new AsyncTextReader(stream);
This scales well. Just need to make sure you don't do anything too dumb with the buffer handling.
pseudo code:
public ctor(Stream stream){
this._stream = stream;
BeginRead();
return;
}
private void BeginRead(){
// kick of async read and return (synchronously)
this._stream.BeginRead(_buffer,0,256,EndRead,this);
}
private void EndRead(IAsyncResult result){
try{
// bytesRead will be *up to* 256
var bytesRead = this._stream.EndRead(result);
if(bytesRead < 1){
OnCompleted();
return;
}
// do work with _buffer, _listOfBuffers
// to get lines out etc...
OnNext(aLineIFound); // times n
BeginRead(); // go round again
}catch(Exception err){
OnException(err);
}
}
Ok, this is the APM, and something only a mother could love. I keenly Await the alternative.
ps: whether the reader should close the stream is an interesting question. I say no, because it didn't create it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With