Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reordering events with Reactive Extensions

I'm trying to reorder events arriving unordered on different threads.

Is it possible to create a reactive extension query that matches these marble diagrams:

s1          1   2       3   4

s2          1   3   2       4

result      1       2   3   4

and...

s1          1   2   3   4

s2          4   3   2   1

result                  1234

That is: Only publish results in version number order.

The closest I have got is using a Join to open a window each time s1 ticks and only close it when s2 arrives with the same number.

Like this:

var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
        publishedEvents.Scan(0, (i, o) => i + 1),
        expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
        _ => Observable.Never<Unit>(),
        (@event, expectedVersion) => new {@event,expectedVersion})
    .Where(x => x.expectedVersion == [email protected])
    .Select(x => x.@event)
    .Subscribe(Persist);

But that won't work with diagram no 2. Group 2 will be completed once s2 ticks with the number 2, and thus before 1.

Does it make sense? Can it be done with Rx? Should it?

EDIT: I guess it's like overlapping windows, where later windows can't close before all preceding windows have closed. And the preceding windows won't close before the window number matches the event version number.

EDIT 2:

I have something like this now, but it's not really the reactive, functional, thread-safe LINQ-revelation, I hoped for (please ignore that my events are JObjects for now):

var orderedEvents = Observable.Create<JObject>(observer =>
{
    var nextVersionExpected = 1;
    var previousEvents = new List<JObject>();
    return events
        .ObserveOn(Scheduler.CurrentThread)
        .Subscribe(@event =>
        {
            previousEvents.Add(@event);

            var version = (long) @event["Version"];
            if (version != nextVersionExpected) return;

            foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
            {
                if ((long) previousEvent["Version"] != nextVersionExpected)
                    break;

                observer.OnNext(previousEvent);
                previousEvents.Remove(previousEvent);
                nextVersionExpected++;
            }
        });
});
like image 322
asgerhallas Avatar asked Nov 12 '13 20:11

asgerhallas


1 Answers

Introduction

The key to this problem is the sort. Anyway you look at it, some form of buffering is required. Whilst no doubt some elaborate combination of operators might achieve this, I think this is a good example where Observable.Create is a good choice.

Generalizing the solution

I've made some effort to generalize my approach to accept any type of ordering key. To do this, I expect to be given:

  • A key selector function used to obtain the key of an event, of type Func<TSource,TKey>
  • The initial key of type TKey
  • A function to get the next key in sequence, of type Func<TKey,TKey>
  • A result selector to generate the result from the paired up events in the source streams, of type Func<TSource,TSource,TSource>

Since I'm just using a 1-based integer sequence for my tests these are satisfied by:

  • keySelector: i => i
  • firstKey: 1
  • nextKeyFunc: k => k+1
  • resultSelector: (left,right) => left

Sort

Here is my Sort attempt. It buffers events into a Dictionary and flushes them as soon as possible to the subscriber:

public static IObservable<TSource> Sort<TSource, TKey>
    (this IObservable<TSource> source,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc)
{
    return Observable.Create<TSource>(o =>
    {
        var nextKey = firstKey;
        var buffer = new Dictionary<TKey, TSource>();
        return source.Subscribe(i =>
        {
            if (keySelector(i).Equals(nextKey))
            {
                nextKey = nextKeyFunc(nextKey);
                o.OnNext(i);
                TSource nextValue;
                while (buffer.TryGetValue(nextKey, out nextValue))
                {
                    buffer.Remove(nextKey);
                    o.OnNext(nextValue);
                    nextKey = nextKeyFunc(nextKey);
                }
            }
            else buffer.Add(keySelector(i), i);
        });
    });
}

I have to say this is a pretty naïve implementation. In production code in the past I have elaborated on this with specific error handling, a fixed-size buffer and time-outs to prevent resource leakage. However, it will do for this example. :)

With this sorted (sorry!), we can now look at handling multiple streams.

Combining Results

First Attempt

My first attempt at this is to produce an unordered stream of events that have been seen the required number of times. This could then be sorted. I do this by grouping elements by key, using GroupByUntil to hold each group until two elements had been captured. Each group is then a stream of results of the same key. For the simple example of integer events, I can just take the last element of each group. However, I don't like this because it's awkward for more real-world scenarios where each result stream may be contributing something useful. I include the code for the sake of interest. Note, so that the tests can be shared between this and my second attempt, I accept an unused resultSelector parameter:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc
     Func<TSource,TSource,TSource> resultSelector)
{
    return left.Merge(right)
               .GroupByUntil(keySelector, x => x.Take(2).LastAsync())
               .SelectMany(x => x.LastAsync())
               .Sort(keySelector, firstKey, nextKeyFunc);
}

Aside: You can hack on the SelectMany clause to decide how to pick results. One advantage this solution has over the second attempt, is that in scenarios with many result streams it is easier to see how to extend it to pick say, the first two out of three result tuples to arrive.

Second Attempt

For this approach I sort each stream independently, and then Zip the results together. Not only is this a far simpler looking operation, it's also far easier to combine results from each stream in interesting ways. To keep the tests compatible with my first approach, I pick the resultSelector function to use the first stream's events as the results, but obviously you have flexibility to do something useful in your scenario:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
     IObservable<TSource> right,
     Func<TSource, TKey> keySelector,
     TKey firstKey,
     Func<TKey, TKey> nextKeyFunc,
     Func<TSource, TSource, TSource> resultSelector)
{
    return Observable.Zip(
        left.Sort(keySelector, firstKey, nextKeyFunc),
        right.Sort(keySelector, firstKey, nextKeyFunc),
        resultSelector);
}

Aside: It isn't too hard to see how this code be extended to a more general case accepting any number of input streams, but as alluded to earlier, using Zip makes it is quite inflexible about blocking at a given key until results from all streams are in.

Test Cases

Finally, here are my tests echoing your example scenarios. To run these, import nuget packages rx-testing and nunit and put the implementations above into a static class:

public class ReorderingEventsTests : ReactiveTest
{
    [Test]
    public void ReorderingTest1()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(400, 3),
            OnNext(500, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(500, 4));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left,right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(100, 1),
            OnNext(300, 2),
            OnNext(400, 3),
            OnNext(500, 4));
    }

    [Test]
    public void ReorderingTest2()
    {
        var scheduler = new TestScheduler();

        var s1 = scheduler.CreateColdObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(400, 4));

        var s2 = scheduler.CreateColdObservable(
            OnNext(100, 4),
            OnNext(200, 3),
            OnNext(300, 2),
            OnNext(400, 1));

        var results = scheduler.CreateObserver<int>();

        s1.OrderedCollect(
            right: s2,
            keySelector: i => i,
            firstKey: 1,
            nextKeyFunc: i => i + 1,
            resultSelector: (left, right) => left).Subscribe(results);

        scheduler.Start();

        results.Messages.AssertEqual(
            OnNext(400, 1),
            OnNext(400, 2),
            OnNext(400, 3),
            OnNext(400, 4));
    }
}

Currying to avoid repetition

Final comment, because I hate repeating myself in code, here's a tweak that avoids the repetitious way I call Sort in the second approach. I've not included it in the main body to avoid confusing readers unfamiliar with currying:

public static IObservable<TSource> OrderedCollect<TSource, TKey>
    (this IObservable<TSource> left,
        IObservable<TSource> right,
        Func<TSource, TKey> keySelector,
        TKey firstKey,
        Func<TKey, TKey> nextKeyFunc,
        Func<TSource, TSource, TSource> resultSelector)
{
    Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
        events => events.Sort(keySelector, firstKey, nextKeyFunc);

    return Observable.Zip(
        curriedSort(left),
        curriedSort(right),
        resultSelector);
}
like image 150
James World Avatar answered Oct 21 '22 23:10

James World