I am trying to use rxjs in my project. I have following sequences, what I expected is that the 1rd sequence will only be handled after a value arrive in another sequence, and only the latest value in the 1st sequence will be reserved. Any suggestion for it?
s1$ |a---b----c-
s2$ |------o----
expected result:
s3$ |------b--c-
I'd combine sample()
that is already very similar to what you need and skipUntil()
.
const start = Scheduler.async.now();
const trigger = new Subject();
const source = Observable
.timer(0, 1000)
.share();
Observable.merge(source.sample(trigger).take(1), source.skipUntil(trigger))
.subscribe(val => console.log(Scheduler.async.now() - start, val));
setTimeout(() => {
trigger.next();
}, 2500);
This will output numbers starting with 2
.
source 0-----1-----2-----3-----4
trigger ---------------X---------
output ---------------2--3-----4
Console output with timestamps:
2535 2
3021 3
4024 4
5028 5
Alternatively you could use switchMap()
and ReplaySubject
but it's probably not as obvious as the previous example and you need two Subjects.
const start = Scheduler.async.now();
const trigger = new Subject();
const source = Observable
.timer(0, 1000)
.share();
const replayedSubject = new ReplaySubject(1);
source.subscribe(replayedSubject);
trigger
.switchMap(() => replayedSubject)
.subscribe(val => console.log(Scheduler.async.now() - start, val));
setTimeout(() => {
trigger.next();
}, 2500);
The output is exactly the same.
I guess I would do this using a ReplaySubject
:
const subject$ = new Rx.ReplaySubject(1)
const one$ = Rx.Observable.interval(1000)
const two$ = Rx.Observable.interval(2500)
one$.subscribe(subject$)
const three$ = two$
.take(1)
.flatMap(() => subject$)
// one$ |----0---1---2---3---4---
// two$ |----------0---------1---
// three$ |----------1-2---3---4---
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With