Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there an "async" version of filter operator in RxJs?

I need to filter entries emitted by an observable by checking the entry against some web service. The normal observable.filter operator is not suitable here, as it expects the predicate function to return the verdict synchronously, but in this situation, the verdict can only be retrieved asynchronously.

I can make shift by the following code, but I was wondering whether there is some better operator I can use for this case.

someObservable.flatmap(function(entry) {
  return Rx.Observable.fromNodeCallback(someAsynCheckFunc)(entry).map(function(verdict) {
    return {
      verdict: verdict,
      entry: entry
    };
  });
}).filter(function(obj) {
  return obj.verdict === true;
}).map(function(obj) {
  return obj.entry;
});
like image 923
victorx Avatar asked Feb 13 '15 00:02

victorx


2 Answers

Here's how you'd implement such an operator using existing operators. There is one snag you need to think about. Because your filter operation is async, it is possible for new items to arrive faster than your filter operation can process them. What should happen in this case? Do you want to run the filters sequentially and guarantee that the order of your items is maintained? Do you want to run the filters in parallel and accept that your items may come out in different order?

Here are the 2 versions of the operator

// runs the filters in parallel (order not guaranteed)
// predicate should return an Observable
Rx.Observable.prototype.flatFilter = function (predicate) {
    return this.flatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

// runs the filters sequentially (order preserved)
// predicate should return an Observable
Rx.Observable.prototype.concatFilter = function (predicate) {
    return this.concatMap(function (value, index) {
        return predicate(value, index)
            .filter(Boolean) // filter falsy values
            .map(function () { return value; });
    });
};

Usage:

var predicate = Rx.Observable.fromNodeCallback(someAsynCheckFunc);
someObservable.concatFilter(predicate).subscribe(...);
like image 110
Brandon Avatar answered Nov 16 '22 05:11

Brandon


Update for RxJS 6+

Since RxJS version 6.0 we have pipe operators instead of observable prototype method chaining.

So I updated the original code of this request to RxJS 6 pipeline style improved by the information in the accepted answer.


Update 2

I released version 1.0.0 of this package:
https://www.npmjs.com/package/filter-async-rxjs-pipe/v/1.0.0
The package seems to have between 90 and 300 downloads a week now, so it seems pretty stable and I'm confident, that it will do it's job.


Update 1

I now refactored this code into a npm package.
https://www.npmjs.com/package/filter-async-rxjs-pipe

The serial variant with concatMap already works correctly, the parallel variant with flatMap seems not to run in parallel currently. But since I need the concatMap version, I currently have all I need. If somebody has an idea on how to write the parallel version correctly, please add an issue at the connected Git repository. :)


Note
Since I only need to pass a predicate function which returns a Promise, I wrote the conversion of the Promise to an Observable directly into the filterAsync method. If you need to have an Observable as filter input, feel free to adjust the code.

export function filterAsync<T>(predicate: (value: T, index: number) => Promise<boolean>): MonoTypeOperatorFunction<T> {
    let count = 0;
    return pipe(
        // Convert the predicate Promise<boolean> to an observable (which resolves the promise,
        // Then combine the boolean result of the promise with the input data to a container object
        concatMap((data: T) => {
            return from(predicate(data, count++))
                .pipe(map((isValid) => ({filterResult: isValid, entry: data})));
        }),
        // Filter the container object synchronously for the value in each data container object
        filter(data => data.filterResult === true),
        // remove the data container object from the observable chain
        map(data => data.entry)
    );
}

Here is a gist with the full ts file code, including imports:
https://gist.github.com/bjesuiter/288326f9822e0bc82389976f8da66dd8#file-filter-async-ts

like image 10
Benjamin Jesuiter Avatar answered Nov 16 '22 04:11

Benjamin Jesuiter