I'm trying to reorder events arriving unordered on different threads.
Is it possible to create a reactive extension query that matches these marble diagrams:
s1 1 2 3 4
s2 1 3 2 4
result 1 2 3 4
and...
s1 1 2 3 4
s2 4 3 2 1
result 1234
That is: Only publish results in version number order.
The closest I have got is using a Join to open a window each time s1 ticks and only close it when s2 arrives with the same number.
Like this:
var publishedEvents = events.Publish().RefCount();
publishedEvents.Join(
publishedEvents.Scan(0, (i, o) => i + 1),
expectedVersion => publishedEvents.Any(@event => @event.Version == expectedVersion),
_ => Observable.Never<Unit>(),
(@event, expectedVersion) => new {@event,expectedVersion})
.Where(x => x.expectedVersion == [email protected])
.Select(x => x.@event)
.Subscribe(Persist);
But that won't work with diagram no 2. Group 2 will be completed once s2 ticks with the number 2, and thus before 1.
Does it make sense? Can it be done with Rx? Should it?
EDIT: I guess it's like overlapping windows, where later windows can't close before all preceding windows have closed. And the preceding windows won't close before the window number matches the event version number.
EDIT 2:
I have something like this now, but it's not really the reactive, functional, thread-safe LINQ-revelation, I hoped for (please ignore that my events are JObjects for now):
var orderedEvents = Observable.Create<JObject>(observer =>
{
var nextVersionExpected = 1;
var previousEvents = new List<JObject>();
return events
.ObserveOn(Scheduler.CurrentThread)
.Subscribe(@event =>
{
previousEvents.Add(@event);
var version = (long) @event["Version"];
if (version != nextVersionExpected) return;
foreach (var previousEvent in previousEvents.OrderBy(x => (long) x["Version"]).ToList())
{
if ((long) previousEvent["Version"] != nextVersionExpected)
break;
observer.OnNext(previousEvent);
previousEvents.Remove(previousEvent);
nextVersionExpected++;
}
});
});
The key to this problem is the sort. Anyway you look at it, some form of buffering is required. Whilst no doubt some elaborate combination of operators might achieve this, I think this is a good example where Observable.Create
is a good choice.
I've made some effort to generalize my approach to accept any type of ordering key. To do this, I expect to be given:
Func<TSource,TKey>
TKey
Func<TKey,TKey>
Func<TSource,TSource,TSource>
Since I'm just using a 1-based integer sequence for my tests these are satisfied by:
i => i
1
k => k+1
(left,right) => left
Here is my Sort
attempt. It buffers events into a Dictionary and flushes them as soon as possible to the subscriber:
public static IObservable<TSource> Sort<TSource, TKey>
(this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc)
{
return Observable.Create<TSource>(o =>
{
var nextKey = firstKey;
var buffer = new Dictionary<TKey, TSource>();
return source.Subscribe(i =>
{
if (keySelector(i).Equals(nextKey))
{
nextKey = nextKeyFunc(nextKey);
o.OnNext(i);
TSource nextValue;
while (buffer.TryGetValue(nextKey, out nextValue))
{
buffer.Remove(nextKey);
o.OnNext(nextValue);
nextKey = nextKeyFunc(nextKey);
}
}
else buffer.Add(keySelector(i), i);
});
});
}
I have to say this is a pretty naïve implementation. In production code in the past I have elaborated on this with specific error handling, a fixed-size buffer and time-outs to prevent resource leakage. However, it will do for this example. :)
With this sorted (sorry!), we can now look at handling multiple streams.
My first attempt at this is to produce an unordered stream of events that have been seen the required number of times. This could then be sorted. I do this by grouping elements by key, using GroupByUntil
to hold each group until two elements had been captured. Each group is then a stream of results of the same key. For the simple example of integer events, I can just take the last element of each group. However, I don't like this because it's awkward for more real-world scenarios where each result stream may be contributing something useful. I include the code for the sake of interest. Note, so that the tests can be shared between this and my second attempt, I accept an unused resultSelector parameter:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc
Func<TSource,TSource,TSource> resultSelector)
{
return left.Merge(right)
.GroupByUntil(keySelector, x => x.Take(2).LastAsync())
.SelectMany(x => x.LastAsync())
.Sort(keySelector, firstKey, nextKeyFunc);
}
Aside: You can hack on the SelectMany
clause to decide how to pick results. One advantage this solution has over the second attempt, is that in scenarios with many result streams it is easier to see how to extend it to pick say, the first two out of three result tuples to arrive.
For this approach I sort each stream independently, and then Zip
the results together. Not only is this a far simpler looking operation, it's also far easier to combine results from each stream in interesting ways. To keep the tests compatible with my first approach, I pick the resultSelector function to use the first stream's events as the results, but obviously you have flexibility to do something useful in your scenario:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
return Observable.Zip(
left.Sort(keySelector, firstKey, nextKeyFunc),
right.Sort(keySelector, firstKey, nextKeyFunc),
resultSelector);
}
Aside: It isn't too hard to see how this code be extended to a more general case accepting any number of input streams, but as alluded to earlier, using Zip
makes it is quite inflexible about blocking at a given key until results from all streams are in.
Finally, here are my tests echoing your example scenarios. To run these, import nuget packages rx-testing
and nunit
and put the implementations above into a static class:
public class ReorderingEventsTests : ReactiveTest
{
[Test]
public void ReorderingTest1()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(400, 3),
OnNext(500, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 3),
OnNext(300, 2),
OnNext(500, 4));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left,right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(100, 1),
OnNext(300, 2),
OnNext(400, 3),
OnNext(500, 4));
}
[Test]
public void ReorderingTest2()
{
var scheduler = new TestScheduler();
var s1 = scheduler.CreateColdObservable(
OnNext(100, 1),
OnNext(200, 2),
OnNext(300, 3),
OnNext(400, 4));
var s2 = scheduler.CreateColdObservable(
OnNext(100, 4),
OnNext(200, 3),
OnNext(300, 2),
OnNext(400, 1));
var results = scheduler.CreateObserver<int>();
s1.OrderedCollect(
right: s2,
keySelector: i => i,
firstKey: 1,
nextKeyFunc: i => i + 1,
resultSelector: (left, right) => left).Subscribe(results);
scheduler.Start();
results.Messages.AssertEqual(
OnNext(400, 1),
OnNext(400, 2),
OnNext(400, 3),
OnNext(400, 4));
}
}
Final comment, because I hate repeating myself in code, here's a tweak that avoids the repetitious way I call Sort
in the second approach. I've not included it in the main body to avoid confusing readers unfamiliar with currying:
public static IObservable<TSource> OrderedCollect<TSource, TKey>
(this IObservable<TSource> left,
IObservable<TSource> right,
Func<TSource, TKey> keySelector,
TKey firstKey,
Func<TKey, TKey> nextKeyFunc,
Func<TSource, TSource, TSource> resultSelector)
{
Func<IObservable<TSource>, IObservable<TSource>> curriedSort =
events => events.Sort(keySelector, firstKey, nextKeyFunc);
return Observable.Zip(
curriedSort(left),
curriedSort(right),
resultSelector);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With