Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the correct way to schedule periodic events in Rx?

Simple question, I would hope: I'm writing an application in which I want to retrieve data from a database; I've elected to use Rx for this purpose to represent the database as a sequence of values.

I only want to poll the database (and thus have my observer's notifications occur) at a maximum of once every 5 seconds. Right now, I have something like this, where the Scheduler is scheduling a periodic task that causes my observer to be subscribed to the observable that is my database:

_scheduler.SchedulePeriodic(_repository, TimeSpan.FromSeconds(5),
    (repo) => repo.AsObservable()
        .Where(item => _SomeFilter(item))
        .Subscribe(item => _SomeProcessFunction(item))
);

Function names and the like omitted for brevity; repo.AsObservable() is simply a function that returns an IObservable<T> of all the items inside the repository at that point.

Now, I figure that this is the correct way of doing things, however before I came up with this solution I did come up with a different solution in which I had an Observable.Timer with the subscribed observer would subscribe to the AsObservable() return value every timer tick instead.

My question is that this seems very.. odd - why am I subscribing multiple times to the observable?

Sorry if this question is confusing, it confused me while writing it, however the schedulers are also confusing for me :P

like image 881
Dan Avatar asked Mar 19 '23 07:03

Dan


1 Answers

What if you use the built in operators instead of manually scheduling tasks?

repo.AsObservable()
    .Where(_SomeFilter)
    // Wait 5 seconds before completing
    .Concat(Observable.Empty<T>().Delay(TimeSpan.FromSeconds(5))
    // Resubscribe indefinitely after source completes
    .Repeat()
    // Subscribe
    .Subscribe(_SomeProcessFunction);
like image 54
cwharris Avatar answered Apr 08 '23 13:04

cwharris