I'm struggling here. Normally I'd read a book but there aren't any yet. I've found countless examples of various things to do with reading streams using RX but I'm finding it very hard to get my head around.
I know I can use Observable.FromAsyncPattern to create a wrapper of the Stream's BeginRead/EndRead or BeginReadLine/EndReadLine methods.
But this only reads once -- when the first observer subscribes.
I want an Observable which will keep reading and pumping OnNext until the stream errors or ends.
In addition to this, I'd also like to know how I can then share that observable with multiple subscribers so they all get the items.
An Observable is like a Stream (in many languages) and allows to pass 0, 1, or more events where the callback is called for each event. They deal with a sequence of asynchronous events.
RxJS provides of and from to convert single values, arrays, objects that emit events, and promises into observables. If your application converts something into an observable from inside the map operator, the next operator in your pipe will receive an observable, and you'll have a stream of streams.
You can use Repeat
in order to keep reading lines until the end of the stream and Publish
or Replay
in order to control sharing across multiple readers.
An example of a simple, full Rx solution for reading lines from any stream until the end would be:
public static IObservable<string> ReadLines(Stream stream)
{
return Observable.Using(
() => new StreamReader(stream),
reader => Observable.FromAsync(reader.ReadLineAsync)
.Repeat()
.TakeWhile(line => line != null));
}
This solution also takes advantage of the fact that ReadLine
returns null
when the end of the stream is reached.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With