Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to branch off of an rxjs stream conditionally?

I am trying to simulate the "brush" feature like the one in any image editor.

I have the following streams:

  • $pointerDown: pointer pressed down
  • $pointerUp: pointer pressed up
  • $position: position of the brush
  • $escape: Escape key pressed

What I want to do

When the user is dragging the mouse, do temporary calculations. If the mouse is up, then commit those changes. If the escape key is pressed then do not commit those changes.

What I am currently handling is the first case:

$pointerDown.pipe(
   r.switchMap(() =>
       $position.pipe(
           r.throttleTime(150),
           r.map(getNodesUnderBrush),
           r.tap(prepareChanges),
           r.takeUntil($pointerUp),
           r.finalize(commitBrushStroke))
)).subscribe()

How can I end the stream in two different ways? What is the idiomatic rxjs for this?

Thanks

like image 271
Mehdi Saffar Avatar asked Nov 06 '22 09:11

Mehdi Saffar


1 Answers

Regarding your question I can see you need to have some kind of state over time. Here your state is the pointerdown/move/dragging observable, that needs to be accumulated or cleared and finally emitted. When I see such a state scenario I always like to use the scan operator:

Pre

For the sake of simple example i did not use your predefined observables. If you have issues adapting your specific pointer usecase to this very similar one, I can try to update it so it is closer to your question

1. What could represent my state

Here I am using an enum [status] to later on react on the event that happened before and an accumulation [acc] for the points over time

interface State {
  acc: number[],
  status: Status
}

enum Status {
  init,
  move,
  finish,
  escape
}

const DEFAULT_STATE: State = {
  acc: [],
  status: Status.init
}

2. Write functions that mutate the state

Your requirement can be split into: accumulate [pointerdown$ + position$], finish [pointerup$], escape [escape$]

const accumulate = (index: number) => (state: State): State =>
({status: Status.move, acc: [...state.acc, index]});

const finish = () => (state: State): State =>
({status: Status.finish, acc: state.acc})

const escape = () => (state: State): State =>
({status: Status.escape, acc: []})

3. Map your functions to your observables

merge(
  move$.pipe(map(accumulate)),
  finish$.pipe(map(finish)),
  escape$.pipe(map(escape))
)

4. Use the functions in the scan where your state over time is placed

scan((acc: State, fn: (state: State) => State) => fn(acc), DEFAULT_STATE)

5. Process your mutated state

Here we only want to process if we have a finish, so we filter for it

filter(state => state.status === Status.finish),

Inner state sample

  1. move$.next('1') = State: {status: move, acc: ['1']}
  2. escape$.next() = State: {status: escape, acc: []}
  3. move$.next('2') = State: {status: move, acc: ['2']}
  4. finish$.next() = State: {status: finish, acc: ['2']}

Running stackblitz

FYI: It is pretty hard to get this mindset of solving state problems with rxjs. But once you understand the flow behind the idea, you can use it in nearly every statefull scenario. You will avoid sideeffects, stick to rxjs workflow and you can easily optimize/debugg your code.

like image 170
Jonathan Stellwag Avatar answered Nov 14 '22 05:11

Jonathan Stellwag