I have 2 observables and I would like to achieve the following:
Get a value from Observable1, then ignore Observable1 and only wait for a value from Observable2 then the same, get a value from Observable1 again and so on..
Is there a way to achieve this in rxjs? the Operation Decision Tree hasn't been useful. I also played around with switchMap()
but that can only do the first part
I created you a custom operator (toggleEmit
) that takes N
observables and toggles between them.
const result$ = toggleEmit(source1$, source2$);
Implemented features:
N
observables can be given to the toggleEmit operator0
to index N.length
. When last observable emits it starts at 0
again0 - N.length
. Multiple emits are distincted
FYI: The code is actually pretty straight foward. It just looks big, as I added several comments to avoid confusion. If you have questions comment and I will try to answer.
const { Subject, merge } = rxjs;
const { map, scan, distinctUntilChanged, filter } = rxjs.operators;
const source1$ = new Subject();
const source2$ = new Subject();
function toggleEmit(...observables) {
// amount of all observables
const amount = observables.length;
// create your updating state that contains the last index and value
const createState = (value, index) => ({ index, value });
/*
* This function updates your state at every emit
* Keep in mind that updateState contains 3 functions:
* 1. is called directly: updateState(index, amount)
* 2. is called by the map operator: map(val => updateState(index, amount)(val)) -> Its just shorthand written
* 3. is called by the scan operator: fn(state)
*/
const updateState = (index, amount) => update => state =>
// Check initial object for being empty and index 0
Object.keys(state).length == 0 && index == 0
// Check if new index is one higher
|| index == state.index + 1
// Check if new index is at 0 and last was at end of observables
|| state.index == amount - 1 && index == 0
? createState(update, index)
: state
// Function is used to avoid same index emit twice
const noDoubleEmit = (prev, curr) => prev.index == curr.index
return merge(
...observables.map((observable, index) =>
observable.pipe(map(updateState(index, amount)))
)
).pipe(
scan((state, fn) => fn(state), {}),
filter(state => Object.keys(state).length != 0),
distinctUntilChanged(noDoubleEmit),
map(state => state.value),
);
}
const result$ = toggleEmit(source1$, source2$);
result$.subscribe(console.log);
source2$.next(0);
source1$.next(1);
source2$.next(2);
source2$.next(3);
source1$.next(4);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
Seems like there are quite a few ways to solve this. Here's another with expand.
import { interval } from 'rxjs';
import { expand, mapTo, take } from 'rxjs/operators';
const left = interval(1000).pipe(mapTo('left'));
const right = interval(5000).pipe(mapTo('right'));
const example = left.pipe(
take(1),
expand(x => (x === 'left' ? right : left).pipe(take(1)))
);
const subscribe = example.subscribe(val => console.log(val));
I'd be interested if someone has a way to do it with one of the buffer*
operators
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With