Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using RX queries, how to get which records have same status for a window of 3 seconds every second?

I have a few days looking at RX, and I have read a lot; I have read IntroToRx; I have also looked at 101 RX Samples, and many other places, but I can't figure this out. It sounds so simple, but I can't get what I need: I need to know which "ID" has been "stuck" in state 'STARTED' for at least 30 minutes.

I have a class MyInfo that looks like this:

public class MyInfo
{
    public string ID { get; set; }
    public string Status { get; set; }
}

And I have coded a Subject to help me test like this:

        var subject = new Subject<MyInfo>();
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "PHASE2" });
        subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
        subject.OnNext(new MyInfo() { ID = "2", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
        subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
        subject.OnCompleted();

My query and subscription looks like this so far: (I'm using seconds in the sample)

var q8 = from e in subject
                 group e by new { ID = e.ID, Status = e.Status } into g
                 from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(3)
                                   , timeShift: TimeSpan.FromSeconds(1))
                 select new
                 {
                     ID = g.Key.ID,
                     Status = g.Key.Status,
                     count = w.Count
                 };
        var subsc = q8.Subscribe(a => Console.WriteLine("{0} {1} {2}", a.ID, a.Status, a.count));

Now I can get an output that tells me ID and what states that ID has seen in the period of time.

ID   Status   Count
1    STARTED   3
2    PHASE1    2
3    STOPPED   3
4    STARTED   2
4    PHASE2    1
2    STOPPED   1

What I wanted to do next is first, discard those that have seen more than 1 state in the interval (So IDs 2 and 4 would get eliminated), and of the ones left over, discard those that have a status that isn't "STARTED" (that would eliminate ID 3). And ID 1 is the record I'm looking for.

Is that the best approach at the problem? And how do I achieve that query?

Also, how can I have my subject send the messages in different intervals, so I can test the windowing.

Thanks!

like image 709
Laura Avatar asked Oct 09 '14 15:10

Laura


1 Answers

Implementation of a solution

My approach to solving the problem is to produce an extension method which accepts an IObservable<MyInfo> stream of inputs (which could be a Subject) and an IScheduler, and that returns a stream of items that have become stuck. It looks like this:

public static class ObservableExtensions
{
    public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;

        return source.Publish(pub =>
            pub.Where(x => x.Status == "STARTED")
                .SelectMany(
                    x => Observable.Return(x)
                        .Delay(TimeSpan.FromMinutes(30), scheduler)
                        .TakeUntil(pub.Where(y => y.Id == x.Id
                                                  && y.Status != "STARTED"))));

    }
}

There's quite a lot of Rx going on here! Let's take it bit by bit...

The general idea is that we want to look for MyInfo instances (hereafter "items") in the "STARTED" state that go "unanswered" by an non-started item with a matching Id for 30 minutes.

Ignore the Publish bit for now, I'll get back to that. Just imagine the pub variable is the source.

Step 1 - Filter for "STARTED" items

pub.Where(x => x.Status == "STARTED")

This bit is easy, we just filtered the source to get just the "STARTED" items.

Step 2 - Turn each item into a delayed stream of it's own

This is a bit trickier. From the moment an item appears, we know that 30 minutes hence we want to answer the question, "Did another item turn up to unstick it?". In order to help us do this, we will create a new stream that will emit just the info itself after 30 minutes. Our plan will be to cut this stream short if a qualifying item turns up to unstick it. Assuming x is the info, then we do this:

Observable.Return(x)
          .Delay(TimeSpan.FromMinutes(30), scheduler)

Observable.Return converts an item into an observable stream that immediately issues an OnNext for the item and then OnCompletes. Despite seeming a bit worthless, it's actually a wonderfully useful building block (and for some advanced reading, this is sometimes called the unit function, a key part of the IObservable monad structure) as it enables us to create a new Observable stream from any item. Once we have this stream, we Delay it so the item will emerge 30 minutes later.

Note how we specify our scheduler when calling Delay - this will facilitate testing with simulated time.

Step 3 - Abandon the delayed stream if an item becomes "unstuck"

Now, if a qualifying item rendering us "unstuck" arrives before the 30 minutes is up, we will no longer be interested in this item.

The qualifying criteria for an "unsticking" item is one that matches by Id and does not have a "STARTED" status - I assumed that another "STARTED" copy of the item is not good enough to unstick a stuck item! (If that's wrong, then any status will do to qualify for unsticking). If an unsticking item arrives in the original undelayed stream (pub), we use TakeUntil to terminate the delayed stream before the delayed item has a chance to emerge:

.TakeUntil(pub.Where(y => y.Id == x.Id
                          && y.Status != "STARTED"))));

Step 4 - Sort out all these item streams

Now, we're in a bit of a mess because we projected every item into it's own stream - we have a stream of streams and we somehow need to get back to a single stream. To do this, we use SelectMany, (advanced reading: equivalent to a monad bind). The SelectMany does two jobs for us here - it allow us to map an item into a stream AND flatten the resulting stream of streams back in to a single stream all in one go. The mapping function we will use is the one we've just been building - so putting it all together so far we have:

pub.Where(x => x.Status == "STARTED")
                .SelectMany(
                 x => Observable.Return(x)
                                .Delay(TimeSpan.FromMinutes(30), scheduler)
                                .TakeUntil(pub.Where(y => y.Id == x.Id
                                                    && y.Status != "STARTED"))));

Step 5 - Ensuring we don't mess with the source stream by using Publish

We're looking good, but there's one subtle problem in the above left to tackle. You will notice that we subscribe to the source (pub) stream more than one - in the initial Where filter AND in the TakeUntil.

The problem with this is that Subscribing to the same stream more than once can have unexpected consequences. Some streams are "cold" - with each subscriber starting it's own chain of events. This can be particularly tricky in queries where time is a critical factor. There can be other issues too - but I don't want to drift too far into this here. Basically, we need to be very careful that we only subscribe once to the source stream. The Publish() method can do this for us - it will subscribe to a source once and then multicast the source to many subscribers.

So the pub that appears in the lambda is a "safe" copy of the source we can safely subscribe to multiple times.

How to address testing

You will need to control time for this - your best bet is to use the purpose built Rx testing facilities in the nuget package rx-testing. With this, you can make use of a TestScheduler to control time and schedule test events.

Here's a trivial test that detects a simple stuck item.

public class StuckDetectorTests : ReactiveTest
{
    [Test]
    public void FindSingleStuckItem()
    {
        var testScheduler = new TestScheduler();

        var xs = testScheduler.CreateColdObservable(
            OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1")));

        var results = testScheduler.CreateObserver<MyInfo>();

        xs.StuckInfos(testScheduler).Subscribe(results);

        testScheduler.Start();

        results.Messages.AssertEqual(
            OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1")));
    }
}

Make sure to derive your test class from ReactiveTest to leverage the OnXXX helper methods.

I also created some helpful factory methods on MyInfo and implemented equality overloads to make testing easier.

The full code is quite lengthy - I published a gist that has more tests here: https://gist.github.com/james-world/62dca2fe2f91531a0401

There's also a good blog post on testing Rx here.

like image 100
James World Avatar answered Oct 23 '22 11:10

James World