Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I avoid any blocking when using Observable.FromEventPattern in Reactive Extensions for .NET?

I'm struggling with some concurrency issues in relation subscribing to an Observable.FromEventPattern() on the TaskPoolScheduler.

Let me illustrate with a code example:

var dataStore = new DataStore();

Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .Select(x => 
    {
        Thread.Sleep(5000); // Simulate long-running calculation.
        var result = 42;
        return result;
    })
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
    });

dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.

My issue is that, when any new items are emitted by the original observable Observable.FromEventPattern() (i.e. when the DataStore object raises new DataChanged events), then they appear to be blocked waiting for the previous items to finish flowing through the entire pipeline.

Since the subscribing is done on the TaskPoolScheduler I had expected every new item emitted to simply spin up a new task, but actually, the source of the event instead seems to block on the event invocation if the pipeline is busy.

How can I accomplish a subscription that executes every new emitted item (raised event) on it's own task/thread, such that the source object never blocks on its internal DataChangedEvent.Invoke() call?

(Except of course the Subscribe() lambda which should execute on the UI thread - which is already the case.)

As a side-note: @jonstodle mentioned in the #rxnet Slack channel that the TaskPoolScheduler might have different semantics than what I assumed. Specifically he said it probably creates one task and does both the subscribing and the producing of values in an event loop inside of that one task. But if that's the case, then I find it a bit strange that the first event invocation doesn't block (since the second one does). Seems to me that if the task pool task doing the subscription is asynchronous enough that the souce doesn't have to block on the first invocation, there shouldn't be a need to make it block on the second call either?

like image 229
Daniel Rosenberg Avatar asked May 30 '17 17:05

Daniel Rosenberg


People also ask

What is the use of fromeventpattern operator?

The FromEventPattern operator converts a .Net event to a sequence of EventPattern<TEventArgs>. Each EventPattern instance contains the event arguments and the object sending the event. The event arguments are provided in the EventArgs property of each EventPattern delivered in the sequence.

Where are the event arguments provided in an eventpattern?

The event arguments are provided in the EventArgs property of each EventPattern delivered in the sequence. The object sending the event is provided in the Sender property of the EventPattern instance.

What is the return value of fromeventpattern?

The return value is an observable sequence that contains data representations of invocations of the underlying .NET event. The FromEventPattern operator converts a .Net event to a sequence of EventPattern<TEventArgs>. Each EventPattern instance contains the event arguments and the object sending the event.

Why is my observable not working after using async pipe?

This is happening because our Observable is cold and every use of the async pipe creates new execution. Just use the share operator, which internally call to publish ().refCount ().


1 Answers

The issue you're hitting is simply the way Rx works - each value produced in a normal Rx pipeline is, well pipelined, and only one value is processed at a time. If the source of a Rx pipeline, in your case the FromEventPattern<DataChangedEventArgs> produces values faster than the observer(s) handles them then they get queued in the pipeline.

The rule is that each observer in the pipeline will only process one value at a time. That happens for any scheduler, not just TaskPoolScheduler.

The way to make it work they way you want is quite simple - you create parallel pipelines and then merge the values back into a single pipeline.

Here's the change:

Observable
    .FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(5000); // Simulate long-running calculation.
            var result = 42;
            return result;
        }))
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...

        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent();
    });

The .SelectMany(x => Observable.Start(() => replaces the .Select(x => allowing the values to being a new observable subscription which runs immediately and then it merges the values back into a single observable.

You may prefer to write it as the semantically identical .Select(x => Observable.Start(() => ...)).Merge().

Here's a simple example that shows how this works:

var source = new Subject<int>();

source
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(1000);
            return x * 2;
        }))
    .Subscribe(result =>
    {
        Console.WriteLine(result);
        source.OnNext(result);
        source.OnNext(result + 1);
    });

source.OnNext(1);

It produces:

2
4
6
14
12
8
10
24
28
30
26
16
20
22
18
48
50
56
52
58
60
62
54
32
34
46
44
40
42
like image 138
Enigmativity Avatar answered Oct 20 '22 06:10

Enigmativity