I have an async
method that is a long-running method that reads a stream and when it finds something fires an event:
public static async void GetStream(int id, CancellationToken token)
It takes a cancellation token because it is create in a new task. Internally it calls await
when it reads a stream:
var result = await sr.ReadLineAsync()
Now, I want to convert this to a method that returns an IObservable<> so that I can use this with the reactive extensions. From what I've read, the best way to do this is using Observable.Create
, and since RX 2.0 now also supports async I can get it all to work with something like this:
public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
return Observable.Create<Message>(
async (IObserver<Message> observer) =>
{
The rest of the code inside is the same, but instead of firing events I'm calling observer.OnNext()
. But, this feels wrong. For one thing I'm mixing CancellationTokens up in there, and although adding the async keyword made it work, is this actually the best thing to do? I'm calling my ObservableStream like this:
Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));
You are correct. Once you represent your interface through an IObservable
, you should avoid requiring the callers to supply a CancellationToken
. That doesn't mean you cannot use them internally. Rx provides several mechanisms to produce CancellationToken
instances which are canceled when the observer unsubscribes from your observable.
There are a number of ways to tackle your problem. The simplest requires almost no changes in your code. It uses an overload of Observable.Create
which supplies you with a CancellationToken
that triggers if the caller unsubscribes:
public static IObservable<Message> ObservableStream(int id)
{
return Observable.Create<Message>(async (observer, token) =>
{
// no exception handling required. If this method throws,
// Rx will catch it and call observer.OnError() for us.
using (var stream = /*...open your stream...*/)
{
string msg;
while ((msg = await stream.ReadLineAsync()) != null)
{
if (token.IsCancellationRequested) { return; }
observer.OnNext(msg);
}
observer.OnCompleted();
}
});
}
You should change GetStream to return a Task, instead of void (returning async void is not good, except when absolutely required, as svick commented). Once you return a Task, you can just call .ToObservable() and you are done.
For example:
public static async Task<int> GetStream(int id, CancellationToken token) { ... }
Then,
GetStream(1, new CancellationToken(false))
.ToObservable()
.Subscribe(Console.Write);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With