Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Preferred method for generating an IObservable<String> from a Stream

As part of our application (in production for about 4 months now) we have a stream of data coming from an external device that we convert to an IObservable

Up until now we've been using the following to generate it, and it's been working quite well.

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return Observable
            .Create<string>(observer => Scheduler.ThreadPool
            .Schedule(() => ReadLoop(streamReader, observer)));
}

private void ReadLoop(StreamReader reader, IObserver<string> observer)
{
    while (true)
    {
        try
        {
            var line = reader.ReadLine();
            if (line != null)
            {
                observer.OnNext(line);
            }
            else
            {
                observer.OnCompleted();
                break;
            }
        }
        catch (Exception ex)
        {
            observer.OnError(ex);
            break;
        }
    }
}

Last night I wondered if there was a way to use the yield return syntax to achieve the same result and came up with this:

IObservable<string> ObserveStringStream(Stream inputStream)
{
    var streamReader = new StreamReader(inputStream);
    return ReadLoop(streamReader)
            .ToObservable(Scheduler.ThreadPool);
}

private IEnumerable<string> ReadLoop(StreamReader reader)
{
    while (true)
    {
        var line = reader.ReadLine();
        if (line != null)
        {
            yield return line;
        }
        else
        {
            yield break;
        }
    }
}

It seems to work quite well and it's much cleaner, but I was wondering if there were any pros or cons for one way over the other, or if there was a better way entirely.

like image 540
baralong Avatar asked Apr 12 '12 02:04

baralong


2 Answers

I think you've got a good idea there (turn Stream into Enumerable<string> then IObservable<string>). However, the IEnumerable code can be cleaner:

IEnumerable<string> ReadLines(Stream stream)
{
    using (StreamReader reader = new StreamReader(stream))
    {
        while (!reader.EndOfStream)
            yield return reader.ReadLine();
    }
}

And then for the IObservable:

IObservable<string> ObserveLines(Stream inputStream)
{
    return ReadLines(inputStream).ToObservable(Scheduler.ThreadPool);
}

This is shorter, more readable, and properly disposes of the streams. It's also lazy.

The ToObservable extension takes care of catching the OnNext events (new lines) as well as the OnCompleted event (end of enumerable) and OnError.

like image 159
yamen Avatar answered Oct 12 '22 20:10

yamen


I don't have the code to hand, but here's how to do it async pre-async CTP.

[Note for skim-readers: no need to bother if you don't need to scale much]

Create an AsyncTextReader implementation, that is itself Observable. The ctor takes in a Stream, and performs a BeginRead(256bytes) on the stream, passing itself as the continuation, then returning.

When the continuation is entered, call EndRead, and add the returned bytes into a little buffer on the class. Repeat this till the buffer contains one or more end-of-line sequences (as per TextWriter). When this happens, send those bit(s) of the buffer out as a string via the via the Observable interface, and repeat.

When you've finished, signal OnComplete etc... (and dispose the stream). If you get an exception thrown from the EndReadByte in your continuation, catch it and pass it out the OnError interface.

calling code then looks like:

IObservable = new AsyncTextReader(stream);

This scales well. Just need to make sure you don't do anything too dumb with the buffer handling.

pseudo code:

public ctor(Stream stream){
    this._stream = stream;
    BeginRead();
    return;
}

private void BeginRead(){
    // kick of async read and return (synchronously)
    this._stream.BeginRead(_buffer,0,256,EndRead,this);
}

private void EndRead(IAsyncResult result){
    try{
        // bytesRead will be *up to* 256
        var bytesRead = this._stream.EndRead(result);
        if(bytesRead < 1){
            OnCompleted();
            return;
        }
        // do work with _buffer, _listOfBuffers
        // to get lines out etc...
        OnNext(aLineIFound); // times n
        BeginRead(); // go round again
    }catch(Exception err){
        OnException(err);
    }
}

Ok, this is the APM, and something only a mother could love. I keenly Await the alternative.

ps: whether the reader should close the stream is an interesting question. I say no, because it didn't create it.

like image 21
piers7 Avatar answered Oct 12 '22 22:10

piers7