Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Combining latest with previous value in an observable stream

I'm trying to figure out how to take an observable sequence of T and get both the most recent and previous values of T in my subscriber. Here is my spike code:

static void Main(string[] args)
{
    var latest = new Subject<string>();
    var previous = latest.SkipLast(1);
    var latestWithPrevious = latest.CombineLatest(previous, (l, p) => new { Latest = l, Previous = p });

    latestWithPrevious.Subscribe(x => Console.WriteLine("Latest: {0} Previous: {1}", x.Latest, x.Previous));

    latest.OnNext("1");
    latest.OnNext("2");
    latest.OnNext("3");

    Console.ReadKey();
}

I am wanting the following output:

Publishing 1
Publishing 2
Latest: 2 Previous: 1
Publishing 3
Latest: 3 Previous: 2

However, I am getting the following:

Publishing 1
Publishing 2
Latest: 2 Previous: 1
Publishing 3
Latest: 3 Previous: 1
Latest: 3 Previous: 2

So the publication of "3" to latest is triggering two ticks in latestWithPrevious: the first has an old value for Previous, and the second has the correct value for Previous.

How can I achieve my goal here?

like image 385
me-- Avatar asked Jul 16 '14 13:07

me--


1 Answers

Use Skip(1), not SkipLast(1). SkipLast skips the last N entries in the stream.

Also, CombineLatest will still trigger for each side when it changes so you will get more events than you want. Use Zip instead of CombineLatest. Zip only triggers after each side produces its next value.

Finally, reverse the semantics. latest is the series skipping the first value, previous is the series including the first value.

var source = new Subject<string>();
var previous = source;
var latest = source.Skip(1);
var latestWithPrevious = latest.Zip(previous, (l, p) => new { Latest = l, Previous = p});
like image 180
Brandon Avatar answered Oct 30 '22 06:10

Brandon