Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the proper way to create an Observable which reads a stream to the end

I'm struggling here. Normally I'd read a book but there aren't any yet. I've found countless examples of various things to do with reading streams using RX but I'm finding it very hard to get my head around.

I know I can use Observable.FromAsyncPattern to create a wrapper of the Stream's BeginRead/EndRead or BeginReadLine/EndReadLine methods.

But this only reads once -- when the first observer subscribes.

I want an Observable which will keep reading and pumping OnNext until the stream errors or ends.

In addition to this, I'd also like to know how I can then share that observable with multiple subscribers so they all get the items.

like image 584
NoPyGod Avatar asked Jan 22 '13 08:01

NoPyGod


People also ask

What is an observable stream?

An Observable is like a Stream (in many languages) and allows to pass 0, 1, or more events where the callback is called for each event. They deal with a sequence of asynchronous events.

What is a stream in RxJS?

RxJS provides of and from to convert single values, arrays, objects that emit events, and promises into observables. If your application converts something into an observable from inside the map operator, the next operator in your pipe will receive an observable, and you'll have a stream of streams.


1 Answers

You can use Repeat in order to keep reading lines until the end of the stream and Publish or Replay in order to control sharing across multiple readers.

An example of a simple, full Rx solution for reading lines from any stream until the end would be:

public static IObservable<string> ReadLines(Stream stream)
{
    return Observable.Using(
        () => new StreamReader(stream),
        reader => Observable.FromAsync(reader.ReadLineAsync)
                            .Repeat()
                            .TakeWhile(line => line != null));
}

This solution also takes advantage of the fact that ReadLine returns null when the end of the stream is reached.

like image 139
glopes Avatar answered Oct 13 '22 23:10

glopes