Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to split an Observable stream in chunks, dependent on second stream?

I thought that this is easy, but my brain is melting right now..

The problem

Given the following IObservable<int> Stream: 1 1 0 0 0 1 0 0 1 0 1

I want to split it into an IObservable<IEnumerable<int>> Stream of the form

1

1 0 0 0

1 0 0

1 0

1

so whenever there is a 0, it just gets added to the IEnumerable, and when a 1 occurs, a new List is started; This is a bit cleaner definition to what my real problem is.

My approach so far

I thought a good solution would be to first convert it into an IObservable<IObservable<int>>via the Window method and then use ToEnumerable, but somehow I don't get it to work.. I used Zip and Skip(1) to get a diff to last element, I used DistinctUntilChanged(), too. I spare you all the variantes I tried...

Probably the closest I came was this code:

int[] ints = new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 };
var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Take(11).Select(i => ints[i]);

Subject<int> subject = new Subject<int>();
observable.Subscribe(subject);

var observableDiff = subject.Skip(1).Zip(subject, (n, p) => new { Previous = p, Next = n });
var windows = observable.Window(() => observableDiff.Where(x => x.Next == 1));

int index = 0;
windows.Subscribe(window =>
{
  Console.WriteLine(string.Format("new window [{0}] ", index++));
  window.Subscribe(number => Console.WriteLine(number));
});

That returns good results, but unfortunately it crashes at the end..

new window [0]
1
new window [1]
1
0
0
0
new window [2]
1
0
0
new window [3]
1
0
new window [4]
new window [5]
new window [6]
new window [7]
new window [8]
new window [9]
<-- it goes on here until window ~ [80] with a stackoverflow exception

If that bug in my code wouldn't exist, I would have achieved it...

Any help would be much appreciated. :)

Edit: I use Rx-Experimental, but it doesn't make a difference (checked with LinqPad). Also removed the Subject, it didn't influence anything. It seems with my new approach (Edit2), you need a subject, otherwise the start of the windows is totally weird.

Edit2: changed the problem slightly, to better highlight my problem, sorry. Also updated my solution.

like image 297
hko Avatar asked Jan 12 '12 15:01

hko


1 Answers

This worked for me:

var ints = (new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 }).ToObservable();

var result =
    ints
        .Publish(ns =>
            ns
                .Where(n => n == 1)
                .Select(n =>
                    ns.TakeWhile(m => m == 0).StartWith(n).ToArray())
        ).Merge();

I've used Publish in to make sure that the ints observable is treated as "hot" rather than "cold".

My results look like this:

Grouped ints

like image 70
Enigmativity Avatar answered Sep 28 '22 12:09

Enigmativity