I have a few days looking at RX, and I have read a lot; I have read IntroToRx; I have also looked at 101 RX Samples, and many other places, but I can't figure this out. It sounds so simple, but I can't get what I need: I need to know which "ID" has been "stuck" in state 'STARTED' for at least 30 minutes.
I have a class MyInfo that looks like this:
public class MyInfo
{
public string ID { get; set; }
public string Status { get; set; }
}
And I have coded a Subject to help me test like this:
var subject = new Subject<MyInfo>();
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "PHASE1" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "PHASE2" });
subject.OnNext(new MyInfo() { ID = "1", Status = "STARTED" });
subject.OnNext(new MyInfo() { ID = "2", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "3", Status = "STOPPED" });
subject.OnNext(new MyInfo() { ID = "4", Status = "STARTED" });
subject.OnCompleted();
My query and subscription looks like this so far: (I'm using seconds in the sample)
var q8 = from e in subject
group e by new { ID = e.ID, Status = e.Status } into g
from w in g.Buffer(timeSpan: TimeSpan.FromSeconds(3)
, timeShift: TimeSpan.FromSeconds(1))
select new
{
ID = g.Key.ID,
Status = g.Key.Status,
count = w.Count
};
var subsc = q8.Subscribe(a => Console.WriteLine("{0} {1} {2}", a.ID, a.Status, a.count));
Now I can get an output that tells me ID and what states that ID has seen in the period of time.
ID Status Count 1 STARTED 3 2 PHASE1 2 3 STOPPED 3 4 STARTED 2 4 PHASE2 1 2 STOPPED 1
What I wanted to do next is first, discard those that have seen more than 1 state in the interval (So IDs 2 and 4 would get eliminated), and of the ones left over, discard those that have a status that isn't "STARTED" (that would eliminate ID 3). And ID 1 is the record I'm looking for.
Is that the best approach at the problem? And how do I achieve that query?
Also, how can I have my subject send the messages in different intervals, so I can test the windowing.
Thanks!
My approach to solving the problem is to produce an extension method which accepts an IObservable<MyInfo>
stream of inputs (which could be a Subject
) and an IScheduler
, and that returns a stream of items that have become stuck. It looks like this:
public static class ObservableExtensions
{
public static IObservable<MyInfo> StuckInfos(this IObservable<MyInfo> source,
IScheduler scheduler = null)
{
scheduler = scheduler ?? Scheduler.Default;
return source.Publish(pub =>
pub.Where(x => x.Status == "STARTED")
.SelectMany(
x => Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
}
}
There's quite a lot of Rx going on here! Let's take it bit by bit...
The general idea is that we want to look for MyInfo
instances (hereafter "items") in the "STARTED" state that go "unanswered" by an non-started item with a matching Id for 30 minutes.
Ignore the Publish
bit for now, I'll get back to that. Just imagine the pub
variable is the source
.
pub.Where(x => x.Status == "STARTED")
This bit is easy, we just filtered the source to get just the "STARTED" items.
This is a bit trickier. From the moment an item appears, we know that 30 minutes hence we want to answer the question, "Did another item turn up to unstick it?". In order to help us do this, we will create a new stream that will emit just the info itself after 30 minutes. Our plan will be to cut this stream short if a qualifying item turns up to unstick it. Assuming x
is the info, then we do this:
Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
Observable.Return
converts an item into an observable stream that immediately issues an OnNext
for the item and then OnComplete
s. Despite seeming a bit worthless, it's actually a wonderfully useful building block (and for some advanced reading, this is sometimes called the unit function, a key part of the IObservable
monad structure) as it enables us to create a new Observable stream from any item. Once we have this stream, we Delay
it so the item will emerge 30 minutes later.
Note how we specify our scheduler when calling Delay
- this will facilitate testing with simulated time.
Now, if a qualifying item rendering us "unstuck" arrives before the 30 minutes is up, we will no longer be interested in this item.
The qualifying criteria for an "unsticking" item is one that matches by Id and does not have a "STARTED" status - I assumed that another "STARTED" copy of the item is not good enough to unstick a stuck item! (If that's wrong, then any status will do to qualify for unsticking). If an unsticking item arrives in the original undelayed stream (pub
), we use TakeUntil
to terminate the delayed stream before the delayed item has a chance to emerge:
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
Now, we're in a bit of a mess because we projected every item into it's own stream - we have a stream of streams and we somehow need to get back to a single stream. To do this, we use SelectMany
, (advanced reading: equivalent to a monad bind). The SelectMany
does two jobs for us here - it allow us to map an item into a stream AND flatten the resulting stream of streams back in to a single stream all in one go. The mapping function we will use is the one we've just been building - so putting it all together so far we have:
pub.Where(x => x.Status == "STARTED")
.SelectMany(
x => Observable.Return(x)
.Delay(TimeSpan.FromMinutes(30), scheduler)
.TakeUntil(pub.Where(y => y.Id == x.Id
&& y.Status != "STARTED"))));
Publish
We're looking good, but there's one subtle problem in the above left to tackle. You will notice that we subscribe to the source (pub
) stream more than one - in the initial Where
filter AND in the TakeUntil
.
The problem with this is that Subscribing to the same stream more than once can have unexpected consequences. Some streams are "cold" - with each subscriber starting it's own chain of events. This can be particularly tricky in queries where time is a critical factor. There can be other issues too - but I don't want to drift too far into this here. Basically, we need to be very careful that we only subscribe once to the source stream. The Publish()
method can do this for us - it will subscribe to a source once and then multicast the source to many subscribers.
So the pub
that appears in the lambda is a "safe" copy of the source we can safely subscribe to multiple times.
You will need to control time for this - your best bet is to use the purpose built Rx testing facilities in the nuget package rx-testing
. With this, you can make use of a TestScheduler
to control time and schedule test events.
Here's a trivial test that detects a simple stuck item.
public class StuckDetectorTests : ReactiveTest
{
[Test]
public void FindSingleStuckItem()
{
var testScheduler = new TestScheduler();
var xs = testScheduler.CreateColdObservable(
OnNext(TimeSpan.FromMinutes(5).Ticks, MyInfo.Started("1")));
var results = testScheduler.CreateObserver<MyInfo>();
xs.StuckInfos(testScheduler).Subscribe(results);
testScheduler.Start();
results.Messages.AssertEqual(
OnNext(TimeSpan.FromMinutes(35).Ticks, MyInfo.Started("1")));
}
}
Make sure to derive your test class from ReactiveTest
to leverage the OnXXX
helper methods.
I also created some helpful factory methods on MyInfo
and implemented equality overloads to make testing easier.
The full code is quite lengthy - I published a gist that has more tests here: https://gist.github.com/james-world/62dca2fe2f91531a0401
There's also a good blog post on testing Rx here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With