Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging two Observables with one taking higher priority

Is it possible to use ReactiveExtensions to achieve the following;

  • Two Observables, one which is "High" priority and the other "Low"

  • Merging both Observables into one, which can then be subscribed to, with intention that this resulting Observable will always emit high priority items ahead of any low priority ones.

I understand that this could be more trivially implemented using two ConcurrentQueue collections and something like this;

return this.highPriorityItems.TryDequeue(out item) 
    || this.lowPriorityItems.TryDequeue(out item);

But this approach has problems like not being "subscribable" in the same manner that an Observable would be (so once the queues are depleted, processing would end without a lot of extra guff to push this off into a Task).

Furthermore, I'd be interested in applying some extra filtering on the queues, like throttling and "distinct until changed", so Rx seems like a natural fit here.

like image 348
Chris McAtackney Avatar asked Apr 10 '15 09:04

Chris McAtackney


2 Answers

What you are describing is of course a priority queue.

Rx is all about streams of events, rather than queues. Of course, queues are used a lot in Rx - but they aren't a first class concept, more part of the implementation detail of Rx's concepts.

A good example of where we need queues is to deal with a slow observer. Events are dispatched sequentially in Rx, and if events arrive faster than an observer can deal with them, then they must be queued against that observer. If there are many observers, then multiple logical queues must be maintained, since observers may progress at varying paces - and Rx chooses not to keep them in lock-step.

"Back-pressure" is the concept of observers providing feedback to observables in order to allow for mechanisms to handle the pressure of a faster observable - such as conflation or throttling. Rx doesn't have a first-class way of introducing back-pressure - the only built in means an observable has of monitoring observers is via the synchronous nature of OnNext. Any other mechanism would need to be out of band. Your question relates directly to back-pressure, since it is only relevant under the case of a slow observer.

I mention all this to provide the evidence for my claim that Rx is not a great choice for providing the kind of priority dispatch you are looking for - really, a first-class queuing mechanism seems a better fit.

To solve the problem at hand, you need to manage the priority queuing yourself, in a custom operator. To restate the problem: what you are saying is that if events arrive during the observer handling of an OnNext event, such that there is a build-up of events to dispatch, then rather than the typical FIFO queue that Rx uses, you want to dispatch based on some priority.

Something to note is that in the spirit of how Rx doesn't keep multiple observers in lock-step, concurrent observers will potentially see events in a different order, which may or may not be an issue for you. You can use a mechanism like Publish to get order consistency - but you probably don't want to do this since the timing of event delivery would get quite unpredictable and inefficient in that scenario.

I'm sure there are better ways to do this, but here is one example of a priority-queue based delivery - you could extend this to work for multiple streams and priorities (or even per-event priorities) using a better queue implementation (such as a b-tree based priority queue) but I've chosen to keep this fairly simple. Even then, note the significant number of concerns the code has to address, around error handling, completion etc. - and I have made choices about when these are signalled that there are certainly plenty of other valid choices for.

All-in-all, this implementation certainly puts me off the idea of using Rx for this. It's complex enough that there are probably bugs here anyway. As I said, there may be neater code for this (especially given the minimal effort I've put in to it!), but conceptually, I am uncomfortable with the idea regardless of the implementation:

public static class ObservableExtensions
{
    public static IObservable<TSource> MergeWithLowPriorityStream<TSource>(
        this IObservable<TSource> source,
        IObservable<TSource> lowPriority,
        IScheduler scheduler = null)
    {    
        scheduler = scheduler ?? Scheduler.Default;
        return Observable.Create<TSource>(o => {    
            // BufferBlock from TPL dataflow is used as it is
            // handily awaitable. package: Microsoft.Tpl.Dataflow        
            var loQueue = new BufferBlock<TSource>();
            var hiQueue = new BufferBlock<TSource>();
            var errorQueue = new BufferBlock<Exception>();
            var done = new TaskCompletionSource<int>();
            int doneCount = 0;
            Action incDone = () => {
                var dc = Interlocked.Increment(ref doneCount);
                if(dc == 2)
                    done.SetResult(0);
            };
            source.Subscribe(
                x => hiQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            lowPriority.Subscribe(
                x => loQueue.Post(x),
                e => errorQueue.Post(e),
                incDone);
            return scheduler.ScheduleAsync(async(ctrl, ct) => {
                while(!ct.IsCancellationRequested)
                {
                    TSource nextItem;
                    if(hiQueue.TryReceive(out nextItem)
                      || loQueue.TryReceive(out nextItem))
                        o.OnNext(nextItem);

                    else if(done.Task.IsCompleted)
                    {
                        o.OnCompleted();
                        return;
                    }

                    Exception error;                        
                    if(errorQueue.TryReceive(out error))
                    {
                        o.OnError(error);
                        return;
                    }

                    var hiAvailableAsync = hiQueue.OutputAvailableAsync(ct);    
                    var loAvailableAsync = loQueue.OutputAvailableAsync(ct);                    
                    var errAvailableAsync =
                        errorQueue.OutputAvailableAsync(ct);
                    await Task.WhenAny(
                        hiAvailableAsync,
                        loAvailableAsync,
                        errAvailableAsync,
                        done.Task);
                }
            });
        });
    }
}

And example usage:

void static Main()
{
    var xs = Observable.Range(0, 3);
    var ys = Observable.Range(10, 3);

    var source = ys.MergeWithLowPriorityStream(xs);

    source.Subscribe(Console.WriteLine, () => Console.WriteLine("Done"));
}

This will print out the elements of ys first, indicating their higher priority.

like image 174
James World Avatar answered Nov 20 '22 01:11

James World


You need to take time into account for a problem like this. In the comment above you talk about user notifications. It seems to me that what you want is a way to say is something like this: Display the most recent notification, unless there is a high priority notification, in that case display that.

Bubble diagrams will make it easier to reason about this. One character is one second:

High  : ---------3---5-6
Low   : 1--2-------4----
Result: 1--2-----3---5-6

Is that what you had in mind? Would you like to buffer messages and display them later? Like in this case, is it ok that message 5 would only be visible for 2 seconds?

like image 37
Daniel Bachler Avatar answered Nov 20 '22 01:11

Daniel Bachler