I need to filter entries emitted by an observable by checking the entry against some web service. The normal observable.filter operator is not suitable here, as it expects the predicate function to return the verdict synchronously, but in this situation, the verdict can only be retrieved asynchronously.
I can make shift by the following code, but I was wondering whether there is some better operator I can use for this case.
someObservable.flatmap(function(entry) {
return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
return {
verdict: verdict,
entry: entry
};
});
}).filter(function(obj) {
return obj.verdict === true;
}).map(function(obj) {
return obj.entry;
});
Here's how you'd implement such an operator using existing operators. There is one snag you need to think about. Because your filter operation is async, it is possible for new items to arrive faster than your filter operation can process them. What should happen in this case? Do you want to run the filters sequentially and guarantee that the order of your items is maintained? Do you want to run the filters in parallel and accept that your items may come out in different order?
Here are the 2 versions of the operator
// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
return this.flatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
return this.concatMap(function (value, index) {
return predicate(value, index)
.filter(Boolean) // filter falsy values
.map(function () { return value; });
});
};
Usage:
var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
Since RxJS version 6.0 we have pipe operators instead of observable prototype method chaining.
So I updated the original code of this request to RxJS 6 pipeline style improved by the information in the accepted answer.
I released version 1.0.0 of this package:
https://www.npmjs.com/package/filter-async-rxjs-pipe/v/1.0.0
The package seems to have between 90 and 300 downloads a week now, so it seems pretty stable and I'm confident, that it will do it's job.
I now refactored this code into a npm package.
https://www.npmjs.com/package/filter-async-rxjs-pipe
The serial variant with concatMap already works correctly, the parallel variant with flatMap seems not to run in parallel currently. But since I need the concatMap version, I currently have all I need. If somebody has an idea on how to write the parallel version correctly, please add an issue at the connected Git repository. :)
Note
Since I only need to pass a predicate function which returns a Promise,
I wrote the conversion of the Promise to an Observable directly into the filterAsync method. If you need to have an Observable as filter input, feel free to adjust the code.
export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
let count = 0;
return pipe(
// Convert the predicate Promise<boolean> to an observable (which resolves the promise,
// Then combine the boolean result of the promise with the input data to a container object
concatMap((data: T) => {
return from(predicate(data, count++))
.pipe(map((isValid) => ({filterResult: isValid, entry: data})));
}),
// Filter the container object synchronously for the value in each data container object
filter(data => data.filterResult === true),
// remove the data container object from the observable chain
map(data => data.entry)
);
}
Here is a gist with the full ts file code, including imports:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With