Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

rxjs multiple observable switch

I have 2 observables and I would like to achieve the following:

Get a value from Observable1, then ignore Observable1 and only wait for a value from Observable2 then the same, get a value from Observable1 again and so on..

Is there a way to achieve this in rxjs? the Operation Decision Tree hasn't been useful. I also played around with switchMap() but that can only do the first part

like image 637
filype Avatar asked Jun 04 '21 04:06

filype


2 Answers

I created you a custom operator (toggleEmit) that takes N observables and toggles between them.

const result$ = toggleEmit(source1$, source2$);

Implemented features:

  • N observables can be given to the toggleEmit operator
  • Observables will be emitted one by one and start repeating. Direction is from index 0 to index N.length. When last observable emits it starts at 0 again
  • Observables can only emit once at a run: 0 - N.length. Multiple emits are distincted

FYI: The code is actually pretty straight foward. It just looks big, as I added several comments to avoid confusion. If you have questions comment and I will try to answer.

const { Subject, merge } = rxjs;
const { map, scan, distinctUntilChanged, filter } = rxjs.operators;

const source1$ = new Subject();
const source2$ = new Subject();

function toggleEmit(...observables) {
  // amount of all observables
  const amount = observables.length;
  
  // create your updating state that contains the last index and value
  const createState = (value, index) => ({ index, value });

  /*
  * This function updates your state at every emit
  * Keep in mind that updateState contains 3 functions:
  * 1. is called directly: updateState(index, amount)
  * 2. is called by the map operator: map(val => updateState(index, amount)(val)) -> Its just shorthand written
  * 3. is called by the scan operator: fn(state)
  */
  const updateState = (index, amount) => update => state =>
    // Check initial object for being empty and index 0
    Object.keys(state).length == 0 && index == 0
    // Check if new index is one higher
    || index == state.index + 1
    // Check if new index is at 0 and last was at end of observables
    || state.index == amount - 1 && index == 0
      ? createState(update, index)
      : state
  
  // Function is used to avoid same index emit twice
  const noDoubleEmit = (prev, curr) => prev.index == curr.index

  return merge(
    ...observables.map((observable, index) =>
      observable.pipe(map(updateState(index, amount)))
    )
  ).pipe(
    scan((state, fn) => fn(state), {}),
    filter(state => Object.keys(state).length != 0),
    distinctUntilChanged(noDoubleEmit),
    map(state => state.value),
  );
}

const result$ = toggleEmit(source1$, source2$);

result$.subscribe(console.log);

source2$.next(0);
source1$.next(1);
source2$.next(2);
source2$.next(3);
source1$.next(4);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
like image 188
Jonathan Stellwag Avatar answered Sep 28 '22 08:09

Jonathan Stellwag


Seems like there are quite a few ways to solve this. Here's another with expand.

import { interval } from 'rxjs';
import { expand, mapTo, take } from 'rxjs/operators';

const left = interval(1000).pipe(mapTo('left'));
const right = interval(5000).pipe(mapTo('right'));
const example = left.pipe(
  take(1),
  expand(x => (x === 'left' ? right : left).pipe(take(1)))
);
const subscribe = example.subscribe(val => console.log(val));

I'd be interested if someone has a way to do it with one of the buffer* operators

like image 33
NickL Avatar answered Sep 28 '22 08:09

NickL