I am looking for something similar to the exhaustMap
operator from rxjs
, but RX.NET
does not seem to have such an operator.
What I need to achieve is that, upon every element of the source stream, I need to start an async
handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.
What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.
I also suspect I need to cleverly use the defer operator here?
Thank you!
Here is an implementation of the ExhaustMap
operator. The source observable is projected to an IObservable<Task<TResult>>
, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged
operator, and finally the observable is flattened with the Concat
operator.
/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, Task<TResult>> function)
{
return source
.Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
{
return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
})
.DistinctUntilChanged()
.Concat();
async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}
The tasks returned by the function
are not guaranteed to be distinct, hence the need for the HideIdentity
local function that returns distinct wrappers of the tasks.
Usage example:
Observable
.Interval(TimeSpan.FromMilliseconds(200))
.Select(x => (int)x + 1)
.Take(10)
.Do(x => Console.WriteLine($"Input: {x}"))
.ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
.Do(x => Console.WriteLine($"Result: {x}"))
.Wait();
Output:
Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9
Update: Here is an alternative implementation, where the function
produces an IObservable<TResult>
instead of a Task<TResult>
:
/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
this IObservable<TSource> source,
Func<TSource, IObservable<TResult>> function)
{
return Observable.Defer(() =>
{
int mutex = 0; // 0: not acquired, 1: acquired
return source.SelectMany(item =>
{
// Attempt to acquire the mutex immediately. If successful, return
// a sequence that releases the mutex when terminated. Otherwise,
// return immediately an empty sequence.
if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
return function(item).Finally(() => Volatile.Write(ref mutex, 0));
return Observable.Empty<TResult>();
});
});
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With