I want a generic way to convert an asynchronous method to an observable. In my case, I'm dealing with methods that uses HttpClient
to fetch data from an API.
Let's say we have the method Task<string> GetSomeData()
that needs to become a single Observable<string>
where the values is generated as a combination of:
GetSomeData()
(for example every x seconds)GetSomeData()
at any given time (for example when user hits refresh).Since there is two ways to trigger execution of GetSomeData()
concurrency can be an issue. To avoid demanding that GetSomeData()
is thread-safe, I want to limit the concurrency so that only one thread is executing the method at the same time. As a consequence I need to handle overlapping requests with some strategy. I made a (kind of) marble diagram trying to describe the problem and wanted outcome
My instinct tells me there is a simple way to achieve this, so please give me some insights :)
This is the solution I've got so far. It unfortunately doesn't solve the concurrency problem.
public class ObservableCreationWrapper<T>
{
private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
private Func<Task<T>> _methodToCall;
private IObservable<T> _manualCalls;
public IObservable<T> Stream { get; private set; }
public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
{
_methodToCall = methodToCall;
_manualCalls = _manualCallsSubject.AsObservable()
.Select(x => Observable.FromAsync(x => methodToCall()))
.Merge(1);
Stream = Observable.FromAsync(() => _methodToCall())
.DelayRepeat(period)
.Merge(_manualCalls);
}
public void TriggerAdditionalCall()
{
_manualCallsSubject.OnNext(Unit.Default);
}
}
Extension method for repeating with delay:
static class Extensions
{
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
}
An example of a service containing the method to generate the observable
class SomeService
{
private int _ticks = 0;
public async Task<string> GetSomeValueAsync()
{
//Just a hack to dermine if request was triggered manuall or by timer
var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";
//Here we have a data race! We would like to limit access to this method
var valueToReturn = $"{_ticks} ({initiatationWay})";
await Task.Delay(500);
_ticks += 1;
return valueToReturn;
}
}
Used like this (data race will occur):
static async Task Main(string[] args)
{
//Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
var someService = new SomeService();
var stopwatch = Stopwatch.StartNew();
var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
observableWrapper.Stream
.Take(6)
.Subscribe(x =>
{
Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
});
await Task.Delay(4000);
observableWrapper.TriggerAdditionalCall();
observableWrapper.TriggerAdditionalCall();
Console.ReadLine();
}
Observable.create () is an inbuilt RxJS method that creates new Observable. RxJS is a third-party library. So, we need to install using the following command. The next step is to create an app.js file inside a root folder and write the following code.
When an observer subscribes to an observable sequence, the thread calling the Subscribe method can be different from the thread in which the sequence runs till completion. Therefore, the Subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes.
In the first step, we have required the Observable operator and then create a new Observable using create () method, which takes an anonymous function. Here, the Observer is an interface for a consumer of push-based notifications delivered by an Observable.
An object conforming to an Observer interface is usually given to the observable.subscribe (Observer) function, and the Observable will call the Observer’s next (value) method to provide notifications.
Here is my take on this problem:
Update: I was able to simplify greatly my suggested solution by borrowing ideas from Enigmativity's answer. The Observable.StartAsync
method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim
.
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
Func<bool, CancellationToken, Task<T>> functionAsync,
TimeSpan period,
out Action manualInvocation)
{
// Arguments validation omitted
var manualSubject = new Subject<bool>();
manualInvocation = () => manualSubject.OnNext(true);
return Observable.Defer(() =>
{
var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
return Observable
.Interval(period)
.Select(_ => false) // Not manual
.Merge(manualSubject)
.TakeUntil(isManual => isManual) // Stop on first manual
.Repeat() // ... and restart the timer
.Prepend(false) // Skip the initial interval delay
.Select(isManual =>
{
if (isManual)
{
// Triggered manually
return Observable.StartAsync(async ct =>
{
await semaphore.WaitAsync(ct);
try { return await functionAsync(isManual, ct); }
finally { semaphore.Release(); }
});
}
else if (semaphore.Wait(0))
{
// Triggered by the timer and semaphore acquired synchronously
return Observable
.StartAsync(ct => functionAsync(isManual, ct))
.Finally(() => semaphore.Release());
}
return null; // Otherwise ignore the signal
})
.Where(op => op != null)
.Switch(); // Pending operations are unsubscribed and canceled
});
}
The out Action manualInvocation
argument is the mechanism that triggers a manual invocation.
Usage example:
int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
await Task.Delay(500, token);
return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();
await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);
subscription.Dispose();
Output:
19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic
The technique of using the Scan
and the DistinctUntilChanged
operators in order to drop elements while the previous asynchronous operation is running, is borrowed from this question.
¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With