I'm struggling with some concurrency issues in relation subscribing to an Observable.FromEventPattern()
on the TaskPoolScheduler
.
Let me illustrate with a code example:
var dataStore = new DataStore();
Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.Select(x =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
})
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
});
dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.
My issue is that, when any new items are emitted by the original observable Observable.FromEventPattern()
(i.e. when the DataStore
object raises new DataChanged
events), then they appear to be blocked waiting for the previous items to finish flowing through the entire pipeline.
Since the subscribing is done on the TaskPoolScheduler
I had expected every new item emitted to simply spin up a new task, but actually, the source of the event instead seems to block on the event invocation if the pipeline is busy.
How can I accomplish a subscription that executes every new emitted item (raised event) on it's own task/thread, such that the source object never blocks on its internal DataChangedEvent.Invoke()
call?
(Except of course the Subscribe()
lambda which should execute on the UI thread - which is already the case.)
As a side-note: @jonstodle mentioned in the #rxnet Slack channel that the TaskPoolScheduler
might have different semantics than what I assumed. Specifically he said it probably creates one task and does both the subscribing and the producing of values in an event loop inside of that one task. But if that's the case, then I find it a bit strange that the first event invocation doesn't block (since the second one does). Seems to me that if the task pool task doing the subscription is asynchronous enough that the souce doesn't have to block on the first invocation, there shouldn't be a need to make it block on the second call either?
The FromEventPattern operator converts a .Net event to a sequence of EventPattern<TEventArgs>. Each EventPattern instance contains the event arguments and the object sending the event. The event arguments are provided in the EventArgs property of each EventPattern delivered in the sequence.
The event arguments are provided in the EventArgs property of each EventPattern delivered in the sequence. The object sending the event is provided in the Sender property of the EventPattern instance.
The return value is an observable sequence that contains data representations of invocations of the underlying .NET event. The FromEventPattern operator converts a .Net event to a sequence of EventPattern<TEventArgs>. Each EventPattern instance contains the event arguments and the object sending the event.
This is happening because our Observable is cold and every use of the async pipe creates new execution. Just use the share operator, which internally call to publish ().refCount ().
The issue you're hitting is simply the way Rx works - each value produced in a normal Rx pipeline is, well pipelined, and only one value is processed at a time. If the source of a Rx pipeline, in your case the FromEventPattern<DataChangedEventArgs>
produces values faster than the observer(s) handles them then they get queued in the pipeline.
The rule is that each observer in the pipeline will only process one value at a time. That happens for any scheduler, not just TaskPoolScheduler
.
The way to make it work they way you want is quite simple - you create parallel pipelines and then merge the values back into a single pipeline.
Here's the change:
Observable
.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
}))
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent();
});
The .SelectMany(x => Observable.Start(() =>
replaces the .Select(x =>
allowing the values to being a new observable subscription which runs immediately and then it merges the values back into a single observable.
You may prefer to write it as the semantically identical .Select(x => Observable.Start(() => ...)).Merge()
.
Here's a simple example that shows how this works:
var source = new Subject<int>();
source
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(1000);
return x * 2;
}))
.Subscribe(result =>
{
Console.WriteLine(result);
source.OnNext(result);
source.OnNext(result + 1);
});
source.OnNext(1);
It produces:
2 4 6 14 12 8 10 24 28 30 26 16 20 22 18 48 50 56 52 58 60 62 54 32 34 46 44 40 42
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With