Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

With Rx, how do I ignore all-except-the-latest value when my Subscribe method is running

Using Reactive Extensions, I want to ignore messages coming from my event stream that occur while my Subscribe method is running. I.e. it sometimes takes me longer to process a message than the time between message, so I want to drop the messages I don't have time to process.

However, when my Subscribe method completes, if any messages did come through I want to process the last one. So I always process the most recent message.

So, if I have some code which does:

messages.OnNext(100);
messages.OnNext(1);
messages.OnNext(2);

and if we assume the '100' takes a long time to process. Then I want the '2' to be processed when the '100' completes. The '1' should be ignored because it was superseded by the '2' while the '100' was still being processed.

Here's an example of the result I want using a background task and Latest()

var messages = Observable.Interval(TimeSpan.FromMilliseconds(100));

Task.Factory.StartNew(() =>
{
    foreach(var n in messages.Latest())
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    }
});

However, Latest() is a blocking call and I'd prefer not to have a thread sitting waiting for the next value like this (there will sometimes be very long gaps between messages).

I can also get the result I want by using a BroadcastBlock from TPL Dataflow, like this:

var buffer = new BroadcastBlock<long>(n => n);
Observable.Interval(TimeSpan.FromMilliseconds(100)).Subscribe(n => buffer.Post(n));

buffer.AsObservable()
    .Subscribe(n =>
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(250));
        Console.WriteLine(n);
    });

but this feels like it should be possible directly in Rx. What's the best way to go about doing it?

like image 621
Wilka Avatar asked Jun 13 '12 08:06

Wilka


2 Answers

Here is a method that is similar to Dave's but uses Sample instead (which is more appropriate than buffer). I've included a similar extension method to the one I added to Dave's answer.

The extension:

public static IDisposable SubscribeWithoutOverlap<T>(this IObservable<T> source, Action<T> action)
{
    var sampler = new Subject<Unit>();

    var sub = source.
        Sample(sampler).
        ObserveOn(Scheduler.ThreadPool).
        Subscribe(l =>
        {
            action(l);
            sampler.OnNext(Unit.Default);
        });

    // start sampling when we have a first value
    source.Take(1).Subscribe(_ => sampler.OnNext(Unit.Default));

    return sub;
}

Note that it's simpler, and there is no 'empty' buffer that's fired. The first element that is sent to the action actually comes from the stream itself.

Usage is straightforward:

messages.SubscribeWithoutOverlap(n =>
{
    Console.WriteLine("start: " + n);
    Thread.Sleep(500);
    Console.WriteLine("end: " + n);
});

messages.Subscribe(x => Console.WriteLine("source: " + x)); // for testing

And results:

source: 0
start: 0
source: 1
source: 2
source: 3
source: 4
source: 5
end: 0
start: 5
source: 6
source: 7
source: 8
source: 9
source: 10
end: 5
start: 10
source: 11
source: 12
source: 13
source: 14
source: 15
end: 10
like image 80
yamen Avatar answered Nov 03 '22 16:11

yamen


Thanks to Lee Campbell (of Intro To Rx fame), I now have a working solution using this extension method:

public static IObservable<T> ObserveLatestOn<T>(this IObservable<T> source, IScheduler scheduler)
{
    return Observable.Create<T>(observer =>
    {
        Notification<T> outsideNotification = null;
        var gate = new object();
        bool active = false;
        var cancelable = new MultipleAssignmentDisposable();
        var disposable = source.Materialize().Subscribe(thisNotification =>
        {
            bool alreadyActive;
            lock (gate)
            {
                alreadyActive = active;
                active = true;
                outsideNotification = thisNotification;
            }

            if (!alreadyActive)
            {
                cancelable.Disposable = scheduler.Schedule(self =>
                {
                    Notification<T> localNotification = null;
                    lock (gate)
                    {
                        localNotification = outsideNotification;
                        outsideNotification = null;
                    }
                    localNotification.Accept(observer);
                    bool hasPendingNotification = false;
                    lock (gate)
                    {
                        hasPendingNotification = active = (outsideNotification != null);
                    }
                    if (hasPendingNotification)
                    {
                        self();
                    }
                });
            }
        });
        return new CompositeDisposable(disposable, cancelable);
    });
}
like image 4
Wilka Avatar answered Nov 03 '22 16:11

Wilka