I've been working on creating an IObservable<T>
implementation using the Reactive Extensions for Twitter's streaming APIs.
From a high level an HTTP request is sent and the connection is kept open. Length-prefixed items are sent to consume.
Basically, it's a call in a loop to Stream.ReadAsync
using the await
keyword. An IObserver<T>
interface implementation (from Observable.Create
or a block from the Dataflow library, it doesn't matter, it's an implementation detail) is passed to this loop which then calls the metods on the IObserver<T>
implementation, producing the observable.
However, there's a number of things that have to be done before this loop begins processing which require calls to Task<T>
-returning methods, all of which are more easily called in C# 5.0 using the await
keyword. Something like this:
public async Task<IObservable<string>> Create(string parameter,
CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).
ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
// Create the observable.
return block.AsObservable();
}
That said, I'm currently returning a Task<IObservable<T>>
from my method, but I feel that I'm missing something in the Reactive Extensions which would allow me to use await
to facilitate the calls I need to make, but also return an IObservable<T>
instead of a Task<IObservable<T>>
.
What method in the Reactive Extensions will allow me to create an observable that requires awaiting methods before returning from the creation method?
The closest thing I've found is Observable.DeferAsync
. Assuming that the call to my method and use of the observable is something along the lines of:
public async Task Observe()
{
// NOT the real name of the interface, but explains it's role here.
IObservableFactory factory;
// Create is really named something else.
IObservable<string> observable = factory.Create("parameter");
// Subscribe.
observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));
// Wait.
await observable;
}
Using DeferAsync
won't work here, as the call to Subscribe
will send the first request, and then read off that, and then the call to await
on observable
will create a second subscription, but to a different observable.
Or, ultimately, is returning Task<IObservable<T>>
the appropriate method of doing this in the Reactive Framework?
Subsequently, since the method returns a Task<T>
, it's good practice to pass a CancellationToken
to cancel the operation. That said, I can understand the CancellationToken
being used to cancel the creation of the observable, but should it also be used to cancel the actual observable (as it could be passed on down to read the stream, etc).
My gut says no, because there's a violation of separation of concerns here as well as the DRY principle with the cancellation:
Subscribe
is going to return an IDisposable
implementation which will cancel the subscription.I wouldn't return a Task<IObservable<T>>
. Mixing Tasks and Observables in your public API just end up being confusing. Remember that Tasks can be thought of as observables that produce a single value. This also means don't mix CancellationTokens with observables in your public API. You control observables by subscribing and unsubscribing.
That doesn't mean you can't mix the concepts behind the scenes. Here's how to do what you want using Observable.Using
, Task.ToObservable
and CancellationDisposable
First, modify your method to return a Task<ISourceBlock<string>>
:
public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<T>();
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
return block;
}
Now here is your new Create method that uses the method above:
public IObservable<string> Create(string parameter)
{
// Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
// Use ToObservable() to convert the Task to an observable so we can then
// use SelectMany to subscribe to the block itself once it is available
return Observable.Using(() => new CancellationDisposable(),
cd => CreateBlock(parameter, cd.Token)
.ToObservable()
.SelectMany(block => block.AsObservable()));
}
Edit: I've since discovered Rx already has implemented this pattern with FromAsync
:
public IObservable<string> Create(string parameter)
{
return Observable.FromAsync(token => CreateBlock(parameter, token))
.SelectMany(block => block.AsObservable());
}
And also, DeferAsync
, which is even more appropriate since your Task
is actually creating the Observable you really want to observe (e.g. your block):
public IObservable<string> Create(string parameter)
{
return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
I don't see why you would need to create the BufferBlock
only after that await
. What you could do instead is to have a synchronous method that creates the BufferBlock
, starts the asynchronous initialization and then immediately returns. Something like:
public IObservable<string> Create(
string parameter, CancellationToken cancellationToken)
{
// Create a BufferBlock which will be the observable.
var block = new BufferBlock<string>();
// Start asynchronous initialization, but don't wait for the result
InitializeAsync(parameter, block, cancellationToken);
// Create the observable.
return block.AsObservable();
}
private async Task InitializeAsync(
string parameter, ITargetBlock<string> block,
CancellationToken cancellationToken)
{
// Make some call that requires await.
var response = await _httpClient.SendAsync(parameter, cancellationToken).
ConfigureAwait(false);
// Start some background task which will use the block and publish to it
// on a separate task. This is not something that is awaited for.
ConsumeResponse(response, block, cancellationToken);
}
(You'll probably also want to handle errors in InitializeAsync()
by calling Fault()
of the passed in block.)
This way, the Create()
method returns just IObservable<T>
, but it also performs initialization asynchronously.
Or, ultimately, is returning
Task<IObservable<T>>
the appropriate method of doing this in the Reactive Framework?
I don't think so. I think there is no need for two levels of asynchrony here.
The cancellation of the creation and the cancellation of the observable are two separate things.
This will depend on your exact requirements, but in general, I don't think they are separate things. You want to cancel the operation and it doesn't matter if it already started or not.
This is similar to how CancellationToken
passed into Task.Run()
behaves: it's used both to cancel the Task
before it starts executing and to detect proper cancellation if it already started.
Calling
Subscribe
is going to return anIDisposable
implementation which will cancel the subscription.
Yes, but that's all it will do. Since what you're describing here is that you want a hot observable (which produces items regardless of observers), it won't actually cancel the observable, only the subscription.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With