Is there a good way to check if not completed Observable is empty at that exact time?
let cache = new ReplaySubject<number>(1);
...
// Here I want to know if 'cache' still empty or not. And, for example, fill it with initial value.
cache.isEmpty().subscribe(isEmpty => {
if (isEmpty) {
console.log("I want to be here!!!");
cache.next(0);
}
});
// but that code does not work until cache.complete()
Actually, it's not that simple and the accepted answer is not very universal. You want to check whether ReplaySubject
is empty at this particular point in time.
However, if you want to make this truly compatible with ReplaySubject
you need to take into account also windowTime
parameter that specifies "time to live" for each value that goes through this object. This means that whether your cache
is empty or not will change in time.
ReplaySubject
has method _trimBufferThenGetEvents
that does what you need. Unfortunately, this method is private so you need to make a little "hack" in JavaScript and extend its prototype
directly.
import { ReplaySubject } from 'rxjs';
// Tell the compiler there's a isNowEmpty() method
declare module "rxjs/ReplaySubject" {
interface ReplaySubject<T> {
isNowEmpty(): boolean;
}
}
ReplaySubject.prototype['isNowEmpty'] = function() {
let events = this._trimBufferThenGetEvents();
return events.length > 0;
};
Then using this ReplaySubject
is simple:
let s = new ReplaySubject<number>(1, 100);
s.next(3);
console.log(s.isNowEmpty());
s.next(4);
setTimeout(() => {
s.next(5);
s.subscribe(val => console.log('cached:', val));
console.log(s.isNowEmpty());
}, 200);
setTimeout(() => {
console.log(s.isNowEmpty());
}, 400);
Note that some calls to isNowEmpty()
return true
, while others return false
. For example the last one returns false
because the value was invalidated in the meantime.
This example prints:
true
cached: 5
true
false
See live demo: https://jsbin.com/sutaka/3/edit?js,console
You could use .scan()
to accumulate your count, and map that to a boolean whether it's nonzero. (It takes a second parameter for a seed value which would make it start with a 0, so it always reflects the current count.)
I've also added a .filter() instead of an if statement to make it cleaner:
let cache = new ReplaySubject<number>(1);
cache
.map((object: T) => 1)
.scan((count: number, incoming: number) => count + incoming, 0)
.map((sum) => sum == 0)
.filter((isEmpty: boolean) => isEmpty)
.subscribe((isEmpty: boolean) => {
console.log("I want to be here!!!");
cache.next(0);
});
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With