Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Rx to block (and possibly timeout) on an asynchronous operation

I'm trying to rewrite some code using Reactive Extensions for .NET but I need some guidance on how to achieve my goal.

I have a class that encapsulates some asynchronous behavior in a low level library. Think something that either reads or writes the network. When the class is started it will try to connect to the environment and when succesful it will signal this back by calling from a worker thread.

I want to turn this asynchronous behavior into a synchronous call and I have created a greatly simplified example below on how that can be achieved:

ManualResetEvent readyEvent = new ManualResetEvent(false);

public void Start(TimeSpan timeout) {
  // Simulate a background process
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Wait for startup to complete.
  if (!this.readyEvent.WaitOne(timeout))
    throw new TimeoutException();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay); // Simulate startup delay.
  this.readyEvent.Set();
}

Running AsyncStart on a worker thread is just a way to simulate the asynchronous behavior of the library and is not part of my real code where the low level library supplies the thread and calls my code on a callback.

Notice that the Start method will throw a TimeoutException if start hasn't completed within the timeout interval.

I want to rewrite this code to use Rx. Here is my first attempt:

Subject<Unit> readySubject = new Subject<Unit>();

public void Start(TimeSpan timeout) {
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
  // Point A - see below
  this.readySubject.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

This is a decent attempt but unfortunately it contains a race condition. If the startup completes fast (e.g. if delay is 0) and if there is an additonal delay at point A then OnNext will be called on readySubject before First has executed. In essence the IObservable I'm applying Timeout and First never sees that startup has completed and a TimeoutException will be thrown instead.

It seems that Observable.Defer has been created to handle problems like this. Here is slightly more complex attempt to use Rx:

Subject<Unit> readySubject = new Subject<Unit>();

void Start(TimeSpan timeout) {
  var ready = Observable.Defer(() => {
    ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1)));
    // Point B - see below
    return this.readySubject.AsObservable();
  });
  ready.Timeout(timeout).First();
}

void AsyncStart(TimeSpan delay) {
  Thread.Sleep(delay);
  this.readySubject.OnNext(new Unit());
}

Now the asynchronous operation is not started immediately but only when the IObservable is being used. Unfortunately there is still a race condition but this time at point B. If the asynchronous operation started calls OnNext before the Defer lambda returns it is still lost and a TimeoutException will be thrown by Timeout.

I know I can use operators like Replay to buffer events but my initial example without Rx doesn't use any kind of buffering. Is there a way for me to use Rx to solve my problem without race conditions? In essence starting the asynchronous operation only after the IObservable has been connected to in this case Timeout and First?


Based on Paul Betts's answer here is working solution:

void Start(TimeSpan timeout) {
  var readySubject = new AsyncSubject<Unit>();
  ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject, TimeSpan.FromSeconds(1)));
  // Point C - see below
  readySubject.Timeout(timeout).First();
}

void AsyncStart(ISubject<Unit> readySubject, TimeSpan delay) {
  Thread.Sleep(delay);
  readySubject.OnNext(new Unit());
  readySubject.OnCompleted();
}

The interesting part is when there is a delay at point C that is longer than the time it takes for AsyncStart to complete. AsyncSubject retains the last notification sent and Timeout and First will still perform as expected.

like image 290
Martin Liversage Avatar asked Jan 17 '11 17:01

Martin Liversage


1 Answers

So, one thing to know about Rx I think a lot of people do at first (myself included!): if you're using any traditional threading function like ResetEvents, Thread.Sleeps, or whatever, you're Doing It Wrong (tm) - it's like casting things to Arrays in LINQ because you know that the underlying type happens to be an array.

The key thing to know is that an async func is represented by a function that returns IObservable<TResult> - that's the magic sauce that lets you signal when something has completed. So here's how you'd "Rx-ify" a more traditional async func, like you'd see in a Silverlight web service:

IObservable<byte[]> readFromNetwork()
{
    var ret = new AsyncSubject();
    // Here's a traditional async function that you provide a callback to
    asyncReaderFunc(theFile, buffer => {
        ret.OnNext(buffer);
        ret.OnCompleted();
    });

    return ret;
}

This is a decent attempt but unfortunately it contains a race condition.

This is where AsyncSubject comes in - this makes sure that even if asyncReaderFunc beats the Subscribe to the punch, AsyncSubject will still "replay" what happened.

So, now that we've got our function, we can do lots of interesting things to it:

// Make it into a sync function
byte[] results = readFromNetwork().First();

// Keep reading blocks one at a time until we run out
readFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => {
    Console.WriteLine("Read {0} bytes in chunk", bytes.Length);
})

// Read the entire stream and get notified when the whole deal is finished
readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .Subscribe(ms => {
        Console.WriteLine("Got {0} bytes in total", ms.ToArray().Length);
    });

// Or just get the entire thing as a MemoryStream and wait for it
var memoryStream = readFromNetwork()
    .Repeat().TakeUntil(x => x == null || x.Length == 0)
    .Aggregate(new MemoryStream(), (ms, bytes) => ms.Write(bytes))
    .First();
like image 132
Ana Betts Avatar answered Nov 10 '22 04:11

Ana Betts