Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create observable from periodic async request

I want a generic way to convert an asynchronous method to an observable. In my case, I'm dealing with methods that uses HttpClient to fetch data from an API.

Let's say we have the method Task<string> GetSomeData() that needs to become a single Observable<string> where the values is generated as a combination of:

  • Repeated periodic calls to GetSomeData() (for example every x seconds)
  • Manually triggered calls to GetSomeData() at any given time (for example when user hits refresh).

Since there is two ways to trigger execution of GetSomeData() concurrency can be an issue. To avoid demanding that GetSomeData() is thread-safe, I want to limit the concurrency so that only one thread is executing the method at the same time. As a consequence I need to handle overlapping requests with some strategy. I made a (kind of) marble diagram trying to describe the problem and wanted outcome

marble diagram

My instinct tells me there is a simple way to achieve this, so please give me some insights :)

This is the solution I've got so far. It unfortunately doesn't solve the concurrency problem.

    public class ObservableCreationWrapper<T>
    {
        private Subject<Unit> _manualCallsSubject = new Subject<Unit>();
        private Func<Task<T>> _methodToCall;
        private IObservable<T> _manualCalls;

        public IObservable<T> Stream { get; private set; }

        public ObservableCreationWrapper(Func<Task<T>> methodToCall, TimeSpan period)
        {
            _methodToCall = methodToCall;
            _manualCalls = _manualCallsSubject.AsObservable()
                .Select(x => Observable.FromAsync(x => methodToCall()))
                .Merge(1);

            Stream = Observable.FromAsync(() => _methodToCall())
                .DelayRepeat(period)
                .Merge(_manualCalls);
        }

        public void TriggerAdditionalCall()
        {
            _manualCallsSubject.OnNext(Unit.Default);
        }
    }

Extension method for repeating with delay:

static class Extensions
{
    public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) => source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();
}

An example of a service containing the method to generate the observable

class SomeService
{
    private int _ticks = 0;

    public async Task<string> GetSomeValueAsync()
    {
        //Just a hack to dermine if request was triggered manuall or by timer
        var initiatationWay = (new StackTrace()).GetFrame(4).GetMethod().ToString().Contains("System.Threading.CancellationToken") ? "manually" : "by timer";

        //Here we have a data race! We would like to limit access to this method 
        var valueToReturn = $"{_ticks} ({initiatationWay})";

        await Task.Delay(500);
        _ticks += 1; 
        return valueToReturn;
    }
}

Used like this (data race will occur):

static async Task Main(string[] args)
{
    //Running this program will yield non deterministic results due to data-race in GetSomeValueAsync
    var someService = new SomeService();
    var stopwatch = Stopwatch.StartNew();
    var observableWrapper = new ObservableCreationWrapper<string>(someService.GetSomeValueAsync, TimeSpan.FromMilliseconds(2000));
    observableWrapper.Stream
        .Take(6)
        .Subscribe(x => 
            {
                Console.WriteLine($"{stopwatch.ElapsedMilliseconds} | Request: {x} fininshed");
            });

    await Task.Delay(4000);
    observableWrapper.TriggerAdditionalCall();
    observableWrapper.TriggerAdditionalCall();
    Console.ReadLine();
}
like image 378
figursagsmats Avatar asked Nov 03 '20 08:11

figursagsmats


People also ask

How to create an observable in RxJS?

Observable.create () is an inbuilt RxJS method that creates new Observable. RxJS is a third-party library. So, we need to install using the following command. The next step is to create an app.js file inside a root folder and write the following code.

Why is subscribe method asynchronous in Java?

When an observer subscribes to an observable sequence, the thread calling the Subscribe method can be different from the thread in which the sequence runs till completion. Therefore, the Subscribe call is asynchronous in that the caller is not blocked until the observation of the sequence completes.

How to create an observable in Java?

In the first step, we have required the Observable operator and then create a new Observable using create () method, which takes an anonymous function. Here, the Observer is an interface for a consumer of push-based notifications delivered by an Observable.

How to get notifications from an observable in JavaScript?

An object conforming to an Observer interface is usually given to the observable.subscribe (Observer) function, and the Observable will call the Observer’s next (value) method to provide notifications.


Video Answer


1 Answers

Here is my take on this problem:


Update: I was able to simplify greatly my suggested solution by borrowing ideas from Enigmativity's answer. The Observable.StartAsync method handles the messy business of cancellation automatically¹, and the requirement of non-overlapping execution can be enforced simply by using a SemaphoreSlim.

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// function that is invoked periodically and manually. Overlapping invocations
/// are prevented. Timer ticks that would cause overlapping are ignored.
/// Manual invocations cancel previous invocations, and restart the timer.
/// </summary>
public static IObservable<T> PeriodicAndManual<T>(
    Func<bool, CancellationToken, Task<T>> functionAsync,
    TimeSpan period,
    out Action manualInvocation)
{
    // Arguments validation omitted
    var manualSubject = new Subject<bool>();
    manualInvocation = () => manualSubject.OnNext(true);
    return Observable.Defer(() =>
    {
        var semaphore = new SemaphoreSlim(1, 1); // Ensure no overlapping
        return Observable
            .Interval(period)
            .Select(_ => false) // Not manual
            .Merge(manualSubject)
            .TakeUntil(isManual => isManual) // Stop on first manual
            .Repeat() // ... and restart the timer
            .Prepend(false) // Skip the initial interval delay
            .Select(isManual =>
            {
                if (isManual)
                {
                    // Triggered manually
                    return Observable.StartAsync(async ct =>
                    {
                        await semaphore.WaitAsync(ct);
                        try { return await functionAsync(isManual, ct); }
                        finally { semaphore.Release(); }
                    });
                }
                else if (semaphore.Wait(0))
                {
                    // Triggered by the timer and semaphore acquired synchronously
                    return Observable
                        .StartAsync(ct => functionAsync(isManual, ct))
                        .Finally(() => semaphore.Release());
                }
                return null; // Otherwise ignore the signal
            })
            .Where(op => op != null)
            .Switch(); // Pending operations are unsubscribed and canceled
    });
}

The out Action manualInvocation argument is the mechanism that triggers a manual invocation.

Usage example:

int ticks = 0;
var subscription = PeriodicAndManual(async (isManual, token) =>
{
    var id = $"{++ticks} " + (isManual ? "manual" : "periodic");
    Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Begin {id}");
    await Task.Delay(500, token);
    return id;
}, TimeSpan.FromMilliseconds(1000), out var manualInvocation)
.Do(x => Console.WriteLine($"{DateTime.Now:HH:mm:ss.fff} Received #{x}"))
.Subscribe();

await Task.Delay(3200);
manualInvocation();
await Task.Delay(200);
manualInvocation();
await Task.Delay(3200);

subscription.Dispose();

Output:

19:52:43.684 Begin 1 periodic
19:52:44.208 Received #1 periodic
19:52:44.731 Begin 2 periodic
19:52:45.235 Received #2 periodic
19:52:45.729 Begin 3 periodic
19:52:46.232 Received #3 periodic
19:52:46.720 Begin 4 periodic
19:52:46.993 Begin 5 manual
19:52:47.220 Begin 6 manual
19:52:47.723 Received #6 manual
19:52:48.223 Begin 7 periodic
19:52:48.728 Received #7 periodic
19:52:49.227 Begin 8 periodic
19:52:49.730 Received #8 periodic
19:52:50.226 Begin 9 periodic

The technique of using the Scan and the DistinctUntilChanged operators in order to drop elements while the previous asynchronous operation is running, is borrowed from this question.

¹ It seems that the Rx library does not handle this messy business satisfactory though, since it just omits disposing of the CancellationTokenSources it creates.

like image 175
Theodor Zoulias Avatar answered Oct 23 '22 19:10

Theodor Zoulias