Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Feedback loop without Subject in RX

I have the following motion equations

move = target_position - position
position = position + move

where target_position is a stream and position is initialized at zero. I would like to have a stream of position. I have tried something like this (in rx pseudo code)

moves = Subject()
position = moves.scan(sum,0)
target_position.combine_latest(position,diff).subscribe( moves.on_next)

It works but I have read that using Subject should be avoided. Is it possible to compute the position stream without Subject?

In python the full implementation looks like this

from pprint import pprint 
from rx.subjects import Subject

target_position = Subject()

moves = Subject()

position = moves.scan(lambda x,y: x+y,0.0)

target_position\
    .combine_latest(position,compute_next_move)\
    .filter(lambda x: abs(x)>0)\
    .subscribe( moves.on_next)

position.subscribe( lambda x: pprint("position is now %s"%x))

moves.on_next(0.0)
target_position.on_next(2.0)
target_position.on_next(3.0)
target_position.on_next(4.0)
like image 469
Louis Chiffre Avatar asked Oct 20 '22 01:10

Louis Chiffre


1 Answers

You could use the expand operator

targetPosition.combineLatest(position, (target, current) => [target, current])
  .expand(([target, current]) => {
    // if you've reached your target, stop
    if(target === current) {
      return Observable.empty()
    }
    // otherwise, calculate the new position, emit it
    // and pump it back into `expand`
    let newPosition = calcPosition(target, current);
    return Observable.just(newPosition)
  })
  .subscribe(updateThings);
like image 57
Ben Lesh Avatar answered Oct 22 '22 21:10

Ben Lesh