So in the sad days of C# 4.0, I created the following "WorkflowExecutor" class that allowed asynchronous workflows in the GUI thread by hacking into IEnumerable's "yield return" continuations to wait for observables. So the following code would, at button1Click, just start a simple workflow that updates the text, waits for you to click button2, and loops after 1 second.
public sealed partial class Form1 : Form { readonly Subject<Unit> _button2Subject = new Subject<Unit>(); readonly WorkflowExecutor _workflowExecutor = new WorkflowExecutor(); public Form1() { InitializeComponent(); } IEnumerable<IObservable<Unit>> CreateAsyncHandler() { Text = "Initializing"; var scheduler = new ControlScheduler(this); while (true) { yield return scheduler.WaitTimer(1000); Text = "Waiting for Click"; yield return _button2Subject; Text = "Click Detected!"; yield return scheduler.WaitTimer(1000); Text = "Restarting"; } } void button1_Click(object sender, EventArgs e) { _workflowExecutor.Run(CreateAsyncHandler()); } void button2_Click(object sender, EventArgs e) { _button2Subject.OnNext(Unit.Default); } void button3_Click(object sender, EventArgs e) { _workflowExecutor.Stop(); } } public static class TimerHelper { public static IObservable<Unit> WaitTimer(this IScheduler scheduler, double ms) { return Observable.Timer(TimeSpan.FromMilliseconds(ms), scheduler).Select(_ => Unit.Default); } } public sealed class WorkflowExecutor { IEnumerator<IObservable<Unit>> _observables; IDisposable _subscription; public void Run(IEnumerable<IObservable<Unit>> actions) { _observables = (actions ?? new IObservable<Unit>[0]).GetEnumerator(); Continue(); } void Continue() { if (_subscription != null) { _subscription.Dispose(); } if (_observables.MoveNext()) { _subscription = _observables.Current.Subscribe(_ => Continue()); } } public void Stop() { Run(null); } }
The smart part of the idea, using "yield" continuations to do the asynchronous work, was taken from Daniel Earwicker's AsyncIOPipe idea: http://smellegantcode.wordpress.com/2008/12/05/asynchronous-sockets-with-yield-return-of-lambdas/, then I added the reactive framework on top of it.
Now I'm having trouble rewriting this using the async feature in C# 5.0, but it seems like it should be straightforward thing to do. When I convert the observables to tasks, they only run once and the while loop crashes the 2nd time around. Any help fixing that would be great.
All that said/asked, what does the async/await mechanism give me that the WorkflowExecutor doesn't? Is there anything I can do with async/await that I can't just do (given a similar amount of code) with the WorkflowExecutor?
To use await with Rxjs observables, we've to convert it to a promise first. To do that, we can use the firstValueFrom or lastValueFrom functions. firstValueFrom returns promise that resolves to the first value of an observable. lastValueFrom returns promise that resolves to the last value of an observable.
Using Async/Await in Angular One of the best improvements in JavaScript is the Async/Await feature introduced in the ECMAScript 7. Basically, Async/Await works on top of Promise and allows you to write async code in a synchronous manner. It simplifies the code and makes the flow and logic more understandable.
Promises deal with one asynchronous event at a time, while observables handle a sequence of asynchronous events over a period of time.
Observables provide support for passing messages between parts of your application. They are used frequently in Angular and are a technique for event handling, asynchronous programming, and handling multiple values.
As James mentioned, you can await an IObservable<T> sequence starting with Rx v2.0 Beta. The behavior is to return the last element (before the OnCompleted), or throw the OnError that was observed. If the sequence contains no elements, you'll get an InvalidOperationException out.
Notice using this, you can get all other desired behaviors:
You can do even more fancy things, like computing the result of an aggregation but observe intermediate values by using Do and Scan:
var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.Scan((x, y) => x + y) .Do(x => { Console.WriteLine("Busy. Current sum is {0}", x); }); Console.WriteLine("Done! The sum is {0}", await res);
As you noticed, Task is very much a one-time use thing, as opposed to Observable's "stream of events". A good way of thinking of this (IMHO) is the 2x2 chart on the Rx team's post about 2.0 Beta:
Depending on circumstance (one-time vs. 'stream' of events), keeping Observable might make more sense.
If you can hop up to the Reactive 2.0 Beta, then you can 'await' observables with that. For instance, my own attempt at an 'async/await' (approximate) version of your code would be:
public sealed partial class Form1 : Form { readonly Subject<Unit> _button2Subject = new Subject<Unit>(); private bool shouldRun = false; public Form1() { InitializeComponent(); } async Task CreateAsyncHandler() { Text = "Initializing"; while (shouldRun) { await Task.Delay(1000); Text = "Waiting for Click"; await _button2Subject.FirstAsync(); Text = "Click Detected!"; await Task.Delay(1000); Text = "Restarting"; } } async void button1_Click(object sender, EventArgs e) { shouldRun = true; await CreateAsyncHandler(); } void button2_Click(object sender, EventArgs e) { _button2Subject.OnNext(Unit.Default); } void button3_Click(object sender, EventArgs e) { shouldRun = false; } }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With