Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What in the Rx Framework allows me to return an IObservable<T> while awaiting other methods during creation?

I've been working on creating an IObservable<T> implementation using the Reactive Extensions for Twitter's streaming APIs.

From a high level an HTTP request is sent and the connection is kept open. Length-prefixed items are sent to consume.

Basically, it's a call in a loop to Stream.ReadAsync using the await keyword. An IObserver<T> interface implementation (from Observable.Create or a block from the Dataflow library, it doesn't matter, it's an implementation detail) is passed to this loop which then calls the metods on the IObserver<T> implementation, producing the observable.

However, there's a number of things that have to be done before this loop begins processing which require calls to Task<T>-returning methods, all of which are more easily called in C# 5.0 using the await keyword. Something like this:

public async Task<IObservable<string>> Create(string parameter,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

That said, I'm currently returning a Task<IObservable<T>> from my method, but I feel that I'm missing something in the Reactive Extensions which would allow me to use await to facilitate the calls I need to make, but also return an IObservable<T> instead of a Task<IObservable<T>>.

What method in the Reactive Extensions will allow me to create an observable that requires awaiting methods before returning from the creation method?

The closest thing I've found is Observable.DeferAsync. Assuming that the call to my method and use of the observable is something along the lines of:

public async Task Observe()
{
    // NOT the real name of the interface, but explains it's role here.
    IObservableFactory factory;

    // Create is really named something else.
    IObservable<string> observable = factory.Create("parameter");

    // Subscribe.
    observable.Subscribe(o => Console.WriteLine("Observed: {0}", o));

    // Wait.
    await observable;
}

Using DeferAsync won't work here, as the call to Subscribe will send the first request, and then read off that, and then the call to await on observable will create a second subscription, but to a different observable.

Or, ultimately, is returning Task<IObservable<T>> the appropriate method of doing this in the Reactive Framework?

Subsequently, since the method returns a Task<T>, it's good practice to pass a CancellationToken to cancel the operation. That said, I can understand the CancellationToken being used to cancel the creation of the observable, but should it also be used to cancel the actual observable (as it could be passed on down to read the stream, etc).

My gut says no, because there's a violation of separation of concerns here as well as the DRY principle with the cancellation:

  • The cancellation of the creation and the cancellation of the observable are two separate things.
  • Calling Subscribe is going to return an IDisposable implementation which will cancel the subscription.
like image 461
casperOne Avatar asked Mar 31 '13 14:03

casperOne


2 Answers

I wouldn't return a Task<IObservable<T>>. Mixing Tasks and Observables in your public API just end up being confusing. Remember that Tasks can be thought of as observables that produce a single value. This also means don't mix CancellationTokens with observables in your public API. You control observables by subscribing and unsubscribing.

That doesn't mean you can't mix the concepts behind the scenes. Here's how to do what you want using Observable.Using, Task.ToObservable and CancellationDisposable

First, modify your method to return a Task<ISourceBlock<string>>:

public async Task<ISourceBlock<string>> CreateBlock(string parameter, CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).ConfigureAwait(false);

     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<T>();

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);

     return block;
}

Now here is your new Create method that uses the method above:

public IObservable<string> Create(string parameter)
{
    // Create a cancellation token that will be canceled when the observable is unsubscribed, use this token in your call to CreateBlock.
    // Use ToObservable() to convert the Task to an observable so we can then
    // use SelectMany to subscribe to the block itself once it is available
    return Observable.Using(() => new CancellationDisposable(),
           cd => CreateBlock(parameter, cd.Token)
               .ToObservable()
               .SelectMany(block => block.AsObservable()));
}

Edit: I've since discovered Rx already has implemented this pattern with FromAsync:

public IObservable<string> Create(string parameter)
{
    return Observable.FromAsync(token => CreateBlock(parameter, token))
                     .SelectMany(block => block.AsObservable());
}

And also, DeferAsync, which is even more appropriate since your Task is actually creating the Observable you really want to observe (e.g. your block):

public IObservable<string> Create(string parameter)
{
    return Observable.DeferAsync(async token => (await CreateBlock(parameter, token)).AsObservable());
}
like image 146
Brandon Avatar answered Nov 03 '22 09:11

Brandon


I don't see why you would need to create the BufferBlock only after that await. What you could do instead is to have a synchronous method that creates the BufferBlock, starts the asynchronous initialization and then immediately returns. Something like:

public IObservable<string> Create(
    string parameter, CancellationToken cancellationToken)
{
     // Create a BufferBlock which will be the observable.
     var block = new BufferBlock<string>();

     // Start asynchronous initialization, but don't wait for the result
     InitializeAsync(parameter, block, cancellationToken);

     // Create the observable.
     return block.AsObservable();
}

private async Task InitializeAsync(
    string parameter, ITargetBlock<string> block,
    CancellationToken cancellationToken)
{
     // Make some call that requires await.
     var response = await _httpClient.SendAsync(parameter, cancellationToken).
         ConfigureAwait(false);

     // Start some background task which will use the block and publish to it
     // on a separate task.  This is not something that is awaited for.
     ConsumeResponse(response, block, cancellationToken);
}

(You'll probably also want to handle errors in InitializeAsync() by calling Fault() of the passed in block.)

This way, the Create() method returns just IObservable<T>, but it also performs initialization asynchronously.

Or, ultimately, is returning Task<IObservable<T>> the appropriate method of doing this in the Reactive Framework?

I don't think so. I think there is no need for two levels of asynchrony here.

The cancellation of the creation and the cancellation of the observable are two separate things.

This will depend on your exact requirements, but in general, I don't think they are separate things. You want to cancel the operation and it doesn't matter if it already started or not.

This is similar to how CancellationToken passed into Task.Run() behaves: it's used both to cancel the Task before it starts executing and to detect proper cancellation if it already started.

Calling Subscribe is going to return an IDisposable implementation which will cancel the subscription.

Yes, but that's all it will do. Since what you're describing here is that you want a hot observable (which produces items regardless of observers), it won't actually cancel the observable, only the subscription.

like image 35
svick Avatar answered Nov 03 '22 08:11

svick