Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RxJs: lossy form of zip operator

Consider using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other.
The current implementation is loss-less, i.e. if I keep these Observables emitting for an hour and then I switch between their emitting rates, the first Observable will eventually catch up with the other.
This will cause memory explosion at some point as the buffer grows larger and larger.
The same will happen if first observable will emit items for several hours and the second will emit one item at the end.

How do I achieve lossy behavior for this operator? I just want to emit anytime I get emissions from both streams and I don't care how many emissions from the faster stream I miss.

Clarifications:

  • Main problem I'm trying to solve here is memory explosion due to the loss-less nature of zip operator.
  • I want to emit anytime I get emissions from both streams even if both streams emit the same value every time

Example:

Stream1: 1 2    3 4    5 6 7                
Stream2:     10     20       30 40 50 60 70

Regular zip will produce the following output:

[1, 10]
[2, 20]
[3, 30]
[4, 40]
[5, 50]
[6, 60]
[7, 70]

const Observable = Rx.Observable;
const Subject = Rx.Subject;


const s1 = new Subject();
const s2 = new Subject();

Observable.zip(s1,s2).subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30); 
 
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

The output I'd like it to produce:

[1, 10]
[3, 20]
[5, 30]

Explanation:
Lossy zip operator is zip with buffer size 1. That means it will only keep the first item from the stream that emitted first and will loose all the rest (items that arrive between first item and first emission from the second stream). So what happens in the example is the following: stream1 emits 1, lossy zip "remembers" it and ignores all the items on stream1 until stream2 emits. First emission of stream2 is 10 so stream1 looses 2. After mutual emission (the first emission of lossy zip) it starts over: "remember" 3, "loose" 4, emit [3,20]. Then start over: "remember" 5, "loose" 6 and 7, emit [5,30]. Then start over: "remember" 40, "loose" 50,60,70 and wait for the next item on stream1.

Example 2:

Stream1: 1 2 3 ... 100000000000
Stream2:                        a

Regular zip operator will explode the memory in this case.
I don't want it to.

Summary:
Essentially I expect the lossy zip operator to remember only the first value emitted by stream 1 after previous mutual emission and emit when stream 2 catches up with stream 1. And repeat.

like image 500
JeB Avatar asked Oct 06 '17 09:10

JeB


2 Answers

The following will give you the desired behavior:

Observable.zip(s1.take(1), s2.take(1)).repeat()

In RxJs 5.5+ pipe syntax:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

Explanation:

  • repeat operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes to zip upon every mutual emission.
  • zip combines two observables and waits for both of them to emit. combineLatest will do as well, it doesn't really matter because of take(1)
  • take(1) actually takes care of memory explosion and defines lossy behavior

If you want to take the last instead of the first value from each stream upon mutual emission use this:

Observable.combineLatest(s1, s2).take(1).repeat()

In RxJs 5.5+ pipe syntax:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
like image 162
JeB Avatar answered Sep 24 '22 21:09

JeB


This gives the sequence [ 0, 2 ] [ 1, 5 ] [ 2, 8 ] [ 3, 12 ] ...

const interval1 = Rx.Observable.interval(1000)
const interval2 = Rx.Observable.interval(300)

const combined = Rx.Observable.combineLatest(interval1, interval2);
const fresh = combined.scan((acc, x) => { 
    return x[0] === acc[0] || x[1] === acc[1] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

fresh.subscribe(console.log);

with arguably fewer operators. Not sure how efficient it is though.
CodePen

For update #3,

Then you'd need a key for each source item.

// Simulated sources according to latest spec provided (update #3)
const source1 = Rx.Observable.from(['x','y','z'])
const source2 = Rx.Observable.from(['a','a','b','b','c'])

// Create keys for sources
let key1 = 0
let key2 = 0
const keyed1 = source1.map(x => [x, key1++])
const keyed2 = source2.map(x => [x, key2++])

const combined = Rx.Observable
  .combineLatest(keyed1, keyed2)
  .map(([keyed1, keyed2]) => [...keyed1, ...keyed2]) // to simplify scan below
combined.subscribe(console.log) // not the output, for illustration only
console.log('-------------------------------------')

const fresh = combined.scan((acc, x) => { 
    return x[1] === acc[1] || x[3] === acc[3] ? acc : x 
  })
  .distinctUntilChanged() //fresh ones only

const dekeyed = fresh
  .map(keyed => { return [keyed[0], keyed[2]] })
dekeyed.subscribe(console.log); // required output

This produces

["x", "a"]  
["y", "a"]  
["z", "b"]  

CodePen (refresh CodePen page after opening console, for better display)

like image 36
Richard Matsen Avatar answered Sep 24 '22 21:09

Richard Matsen