Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Have withLatestFrom wait until all sources have produced one value

Tags:

I'm making use of the withLatestFrom operator in RxJS in the normal way:

var combined = source1.withLatestFrom(source2, source3);

...to actively collect the most recent emission from source2 and source3 and to emit all three value only when source1 emits.

But I cannot guarantee that source2 or source3 will have produced values before source1 produces a value. Instead I need to wait until all three sources produce at least one value each before letting withLatestFrom do its thing.

The contract needs to be: if source1 emits then combined will always eventually emit when the other sources finally produce. If source1 emits multiple times while waiting for the other sources we can use the latest value and discard the previous values. Edit: as a marble diagram:

--1------------2---- (source)
----a-----b--------- (other1)
------x-----y------- (other2)
------1ax------2by--


--1------------2---- (source)
------a---b--------- (other1)
--x---------y------- (other2)
------1ax------2by--


------1--------2---- (source)
----a-----b--------- (other1)
--x---------y------- (other2)
------1ax------2by--

I can make a custom operator for this, but I want to make sure I'm not missing an obvious way to do this using the vanilla operators. It feels almost like I want combineLatest for the initial emit and then to switch to withLatestFrom from then on but I haven't been able to figure out how to do that.

Edit: Full code example from final solution:

var Dispatcher = new Rx.Subject();
var source1 = Dispatcher.filter(x => x === 'foo');
var source2 = Dispatcher.filter(x => x === 'bar');
var source3 = Dispatcher.filter(x => x === 'baz');

var combined = source1.publish(function(s1) {
    return source2.publish(function(s2) {
        return source3.publish(function(s3) {
            var cL = s1.combineLatest(s2, s3).take(1).do(() => console.log('cL'));
            var wLF = s1.skip(1).withLatestFrom(s2, s3).do(() => console.log('wLF'));

            return Rx.Observable.merge(cL, wLF);
        });
    });
});

var sub1 = combined.subscribe(x => console.log('x', x));

// These can arrive in any order
// and we can get multiple values from any one.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
Dispatcher.onNext('baz');

// combineLatest triggers once we have all values.
// cL
// x ["foo", "bar", "baz"]

// withLatestFrom takes over from there.
Dispatcher.onNext('foo');
Dispatcher.onNext('bar');
Dispatcher.onNext('foo');
// wLF
// x ["foo", "bar", "baz"]
// wLF
// x ["foo", "bar", "baz"]
like image 595
whiteinge Avatar asked Aug 23 '16 09:08

whiteinge


1 Answers

I think the answer is more or less as you described, let the first value be a combineLatest, then switch to withLatestFrom. My JS is hazy, but I think it would look something like this:

var selector = function(x,y,z) {};

var combined = Rx.Observable.concat(
    source1.combineLatest(source2, source3, selector).take(1),
    source1.withLatestFrom(source2, source3, selector)
);

You should probably use publish to avoid multiple subscriptions, so that would look like this:

var combined = source1.publish(function(s1)
{
    return source2.publish(function(s2)
    {
        return source3.publish(function(s3)
        {
            return Rx.Observable.concat(
                s1.combineLatest(s2, s3, selector).take(1),
                s1.withLatestFrom(s2, s3, selector)
            );
        });
    });
});

or using arrow functions...

var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.concat(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.withLatestFrom(s2, s3, selector)
    )
)));

EDIT:

I see the problem with concat, the withLatestFrom isn't getting the values. I think the following would work:

var combined = source1.publish(s1 => source2.publish(s2 => source3.publish(s3 => 
    Rx.Observable.merge(
        s1.combineLatest(s2, s3, selector).take(1),
        s1.skip(1).withLatestFrom(s2, s3, selector)
    )
)));

...so take one value using combineLatest, then get the rest using withLatestFrom.

like image 65
Shlomo Avatar answered Sep 24 '22 17:09

Shlomo