Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Rx, take the last fired event every 60 seconds

I have an event that fires (potentially) every second (a timer tick). What I'd like to do is take the last tick's EventArgs value every 60 seconds (if there were new ticks during those 60 seconds). I tried doing something like:

Observable.FromEventPattern(
    h => myThing.SomeEvent += h,
    h => myThing.SomeEvent -= h)
        .Throttle(TimeSpan.FromSeconds(60))
        .TakeLast(1)
        .Do(p => // do something with p);

Unfortunately I never get to the Do method, and I'm not sure what I'm doing wrong?

Apologies if this is completely wrong, my first time using Rx :)

Edit: Per suggestions below, I changed Do to Subscribe, but I still don't get any results, if I use Throttle. Should I be using it at all?

like image 981
Igal Tabachnik Avatar asked Jul 25 '12 18:07

Igal Tabachnik


4 Answers

I think this is being over engineered, you just need to use Sample, it does exactly what you want:

source.Sample(TimeSpan.FromSeconds(60)).Subscribe(x => /* .. */);
like image 197
yamen Avatar answered Nov 18 '22 11:11

yamen


Do this:

Observable.FromEventPattern(h => myThing.SomeEvent += h, h => myThing.SomeEvent -= h)
    .Window(TimeSpan.FromSeconds(60))
    .SelectMany(x => x.TakeLast())
    .Subscribe(x => /* ... */);
like image 21
Ana Betts Avatar answered Nov 18 '22 11:11

Ana Betts


Given that snippet, nothing will happen until you Subscribe to the Observable. IIRC, the event is only wired up when someone calls Subscribe (it's er, un-wired when you Dispose the subscription).

I try to avoid using Do. An observable is a stream of values, and you should really handle those values (and the stream ending, or faulting) via the methods passed in during Subscribe. Do then becomes a side effect - something you do as the values whizz by on their way to an OnNext method.

like image 3
citizenmatt Avatar answered Nov 18 '22 10:11

citizenmatt


Well Throttle won't solve the problem because..

The way throttle works is that it waits for a pause for the given amount of time, let's say you get an event every second and your Throttle is 1 minute, you will never see anything. Unless something happens with your stream and all of a sudden there is a pause for over 1 minute.

One of the most used examples for Throttle is using it for an AutoComplete box, where you don't want to fire a request for the filtered list on every keystroke, so you set a throttle for let's say 300ms, so when ever the user stops typing for at least 300ms you will fire of the search.

I wonder if your solution would benefit from buffer or window instead...I bet that Paul can figure out something way more awesome than my brain can at the moment. Haven't been using Rx much for a couple of months now :(

like image 2
cyberzed Avatar answered Nov 18 '22 11:11

cyberzed