Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

awaiting on an observable

Tags:

So in the sad days of C# 4.0, I created the following "WorkflowExecutor" class that allowed asynchronous workflows in the GUI thread by hacking into IEnumerable's "yield return" continuations to wait for observables. So the following code would, at button1Click, just start a simple workflow that updates the text, waits for you to click button2, and loops after 1 second.

public sealed partial class Form1 : Form {     readonly Subject<Unit> _button2Subject = new Subject<Unit>();     readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor();      public Form1() {         InitializeComponent();     }      IEnumerable<IObservable<Unit>> CreateAsyncHandler() {         Text = "Initializing";         var scheduler = new ControlScheduler(this);         while (true) {             yield return scheduler.WaitTimer(1000);             Text = "Waiting for Click";             yield return _button2Subject;             Text = "Click Detected!";             yield return scheduler.WaitTimer(1000);             Text = "Restarting";         }     }      void button1_Click(object sender, EventArgs e) {         _workflowExecutor.Run(CreateAsyncHandler());     }      void button2_Click(object sender, EventArgs e) {         _button2Subject.OnNext(Unit.Default);     }      void button3_Click(object sender, EventArgs e) {         _workflowExecutor.Stop();     } }  public static class TimerHelper {     public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) {         return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default);     } }  public sealed class WorkflowExecutor {     IEnumerator<IObservable<Unit>> _observables;     IDisposable _subscription;      public void Run(IEnumerable<IObservable<Unit>> actions) {         _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator();         Continue();     }      void Continue() {         if (_subscription != null) {             _subscription.Dispose();         }         if (_observables.MoveNext()) {             _subscription = _observables.Current.Subscribe(_ => Continue());         }     }      public void Stop() {         Run(null);     } } 

The smart part of the idea, using "yield" continuations to do the asynchronous work, was taken from Daniel Earwicker's AsyncIOPipe idea: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, then I added the reactive framework on top of it.

Now I'm having trouble rewriting this using the async feature in C# 5.0, but it seems like it should be straightforward thing to do. When I convert the observables to tasks, they only run once and the while loop crashes the 2nd time around. Any help fixing that would be great.

All that said/asked, what does the async/await mechanism give me that the WorkflowExecutor doesn't? Is there anything I can do with async/await that I can't just do (given a similar amount of code) with the WorkflowExecutor?

like image 449
Dax Fohl Avatar asked Apr 24 '12 00:04

Dax Fohl


People also ask

Can I await an Observable?

To use await with Rxjs observables, we've to convert it to a promise first. To do that, we can use the firstValueFrom or lastValueFrom functions. firstValueFrom returns promise that resolves to the first value of an observable. lastValueFrom returns promise that resolves to the last value of an observable.

Should I use async await in Angular?

Using Async/Await in Angular One of the best improvements in JavaScript is the Async/Await feature introduced in the ECMAScript 7. Basically, Async/Await works on top of Promise and allows you to write async code in a synchronous manner. It simplifies the code and makes the flow and logic more understandable.

What is the difference between an Observable and a promise?

Promises deal with one asynchronous event at a time, while observables handle a sequence of asynchronous events over a period of time.

What are observables in Angular?

Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.


2 Answers

As James mentioned, you can await an IObservable<T> sequence starting with Rx v2.0 Beta. The behavior is to return the last element (before the OnCompleted), or throw the OnError that was observed. If the sequence contains no elements, you'll get an InvalidOperationException out.

Notice using this, you can get all other desired behaviors:

  • Get the first element by awaiting xs.FirstAsync()
  • Ensure there's only a single value by awaiting xs.SingleAsync()
  • When you're fine with an empty sequence, await xs.DefaultIfEmpty()
  • To get all the elements, await xs.ToArray() or await xs.ToList()

You can do even more fancy things, like computing the result of an aggregation but observe intermediate values by using Do and Scan:

var xs = Observable.Range(0, 10, Scheduler.Default);  var res = xs.Scan((x, y) => x + y)             .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); });  Console.WriteLine("Done! The sum is {0}", await res); 
like image 126
Bart De Smet Avatar answered Sep 29 '22 19:09

Bart De Smet


As you noticed, Task is very much a one-time use thing, as opposed to Observable's "stream of events". A good way of thinking of this (IMHO) is the 2x2 chart on the Rx team's post about 2.0 Beta:

2x2 chart for task vs observable

Depending on circumstance (one-time vs. 'stream' of events), keeping Observable might make more sense.

If you can hop up to the Reactive 2.0 Beta, then you can 'await' observables with that. For instance, my own attempt at an 'async/await' (approximate) version of your code would be:

public sealed partial class Form1 : Form {     readonly Subject<Unit> _button2Subject = new Subject<Unit>();      private bool shouldRun = false;      public Form1()     {         InitializeComponent();     }      async Task CreateAsyncHandler()     {         Text = "Initializing";         while (shouldRun)         {             await Task.Delay(1000);             Text = "Waiting for Click";             await _button2Subject.FirstAsync();             Text = "Click Detected!";             await Task.Delay(1000);             Text = "Restarting";         }     }      async void button1_Click(object sender, EventArgs e)     {         shouldRun = true;         await CreateAsyncHandler();     }      void button2_Click(object sender, EventArgs e)     {         _button2Subject.OnNext(Unit.Default);     }      void button3_Click(object sender, EventArgs e)     {         shouldRun = false;     } } 
like image 38
James Manning Avatar answered Sep 29 '22 19:09

James Manning