Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TPL Dataflow and Rx Combined example [closed]

I just want to learn both and how to use them together. I understand that they can complement each other I just could not find an example of someone actually doing it.

like image 748
naeron84 Avatar asked Jul 08 '12 14:07

naeron84


1 Answers

Let me start with a bit of background.

The .NET framework has a number of special types - either proper classes or interfaces - Task<T>, IObservable<T>, Nullable<T>, IEnumerable<T>, Lazy<T>, etc - that provide special powers to the underlying type T.

The TPL uses Task<T> to represent asynchronous computation of a single value of T.

Rx uses IObservable<T> to represent asynchronous computation of zero or more values of T.

It's the "asynchronous computation" aspect of both of these that bring TPL and Rx together.

Now, the TPL also uses the type Task to represent the asynchronous execution of an Action lambda, but this can be considered a special case of Task<T> where T is void. Very much like a standard method in c# returns void like so:

public void MyMethod() { }

Rx also allow for the same special case with use of a special type called Unit.

The difference between the TPL and Rx is in the number of values returned. TPL is one and only one whereas Rx is zero or more.

So, if you treat Rx in a special way by only working with observable sequences that return a single value you can do some computations in a similar way to the TPL.

For example, in the TPL I could write:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

And in Rx the equivalent would be:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

I could go one step further in Rx by specifying that the TPL should be used to execute the computation like so:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

(By default the Thread Pool is used.)

Now I could do some "mixing and matching". If I add a reference to the System.Reactive.Threading.Tasks namespace I can move between tasks and observables quite easily.

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

Notice the ToObservable() & .ToTask() calls and the resulting flips from one library to the other.

If I have an observable that returns more than one value I can use the observable .ToArray() extension method to turn multiple sequence values into a single array value that can be turned into a task. Like so:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

I think this is a fairly basic answer to your question. Is it what you were expecting?

like image 149
Enigmativity Avatar answered Oct 30 '22 18:10

Enigmativity