Simple question, I would hope: I'm writing an application in which I want to retrieve data from a database; I've elected to use Rx for this purpose to represent the database as a sequence of values.
I only want to poll the database (and thus have my observer's notifications occur) at a maximum of once every 5 seconds. Right now, I have something like this, where the Scheduler is scheduling a periodic task that causes my observer to be subscribed to the observable that is my database:
_scheduler.SchedulePeriodic(_repository, TimeSpan.FromSeconds(5),
(repo) => repo.AsObservable()
.Where(item => _SomeFilter(item))
.Subscribe(item => _SomeProcessFunction(item))
);
Function names and the like omitted for brevity; repo.AsObservable()
is simply a function that returns an IObservable<T>
of all the items inside the repository at that point.
Now, I figure that this is the correct way of doing things, however before I came up with this solution I did come up with a different solution in which I had an Observable.Timer
with the subscribed observer would subscribe to the AsObservable()
return value every timer tick instead.
My question is that this seems very.. odd - why am I subscribing multiple times to the observable?
Sorry if this question is confusing, it confused me while writing it, however the schedulers are also confusing for me :P
What if you use the built in operators instead of manually scheduling tasks?
repo.AsObservable()
.Where(_SomeFilter)
// Wait 5 seconds before completing
.Concat(Observable.Empty<T>().Delay(TimeSpan.FromSeconds(5))
// Resubscribe indefinitely after source completes
.Repeat()
// Subscribe
.Subscribe(_SomeProcessFunction);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With