Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Javascript equivalents for Java Streams API

I like the Java 8's streaming API. There are plenty of useful intermediate and terminal methods to transform and collect the stream. I'm talking about intermediate methods like distinct() or terminal methods like collect(). I find the Collector API especially useful, to reduce the stream to deep grouping maps.

What is the javascript equivalent for the Java streaming API? I know there're basic functions like map, filter and reduce, but don't find any more generalized interfaces provided by javascript native to query or group the data in collection. Are there some production ready libraries to match the Java Streaming API?

like image 799
Tuomas Toivonen Avatar asked Apr 14 '17 08:04

Tuomas Toivonen


3 Answers

From the api level, lodash/RxJS/stream.js may meet the requirement, but the powerful thing of Java Stream is it can leverage the modern cpu multi core architecture so to parallelize the jobs. However, this is not solved by any of these pure js libraries, at then end of the day, these js are still running in a single threaded runtime, and has 1 core usage at the same time.

I guess the JS engine needs to provide support to achieve the performance target.

like image 197
Ming Zhu Avatar answered Oct 20 '22 11:10

Ming Zhu


java 8 stream() is the same as lodash chain()

java 8 collect() is the same as lodash value()

java 8 distinct() is the same as lodash uniq()

java 8 map() is the same as lodash map()

lodash is more comprehensive, since it has been around longer.

like image 12
Eric Hartford Avatar answered Oct 20 '22 10:10

Eric Hartford


JavaScript has no parallelism, so streams would always be sequential, and a collector would not need a combiner.

I have here tried to mimic the Stream API in JavaScript, but stripped from several features. Still I think it has the key features.

As you focussed on Collectors, I added a Collector class with a constructor and static methods which roughly correspond to Java's Collectors (plural) interface. The Stream class has a constructor which takes an iterable. It has most of the static methods as in Java, with some variation when it comes to reduce and iterator to make it more aligned with JavaScript's practice.

There's a Stream.Builder class included as well.

Finally, this snippet runs several examples. I think it looks quite familiar if you know the Java API:

class Collector {
    constructor(supplier, accumulator, finisher=a => a) {
        this.supplier = supplier;
        this.accumulator = accumulator;
        this.finisher = finisher;
    }
    static toArray() {
        return self._toArray ??= new Collector(Array, (acc, item) => (acc.push(item), acc));
    }
    static toSet() {
        return self._toSet ??= new Collector(() => new Set, (acc, item) => acc.add(item));
    }
    static toMap(keyMapper, valueMapper) {
        return new Collector(() => new Map, (acc, item) => acc.add(keyMapper(item), valueMapper(item)));
    }
    static toObject(keyMapper, valueMapper) {
        return new Collector(Object, (acc, item) => acc[keyMapper(item)] = valueMapper(item));
    }
    static averaging(mapper=a => a) {
        return new Collector(
            () => [0, 0], 
            ([sum, count], value) => [sum + mapper.call(value, value), count + 1],
            ([sum, count]) => sum / count
        );
    }
    static collectingAndThen({supplier, accumulator, finisher}, nextFinisher) {
        return new Collector(
            supplier, 
            accumulator,
            value => (prev => nextFinisher.call(prev, prev))(finisher.call(value, value))
        );
    }
    static counting() {
        return this._counting ??= new Collector(Number, (count, value) => count + 1);
    }
    static summing(mapper=Number) {
        return new Collector(Number, (sum, value) => sum + mapper.call(value, value));
    }
    static joining(delimiter=",", prefix="", postfix="") {
        return this.collectingAndThen(Collector.toArray(), arr => prefix + arr.join(delimiter) + postfix);
    }
    // No implementation of partitioningBy, as it is the same as groupingBy with a boolean classifier
    static groupingBy(classifier, {supplier, accumulator, finisher} = Collector.toArray()) {
        return new Collector(
            () => new Map,
            (map, value) => {
                const key = classifier.call(value, value);
                let result = map.get(key) ?? supplier();
                return map.set(key, accumulator(result, value));
            },
            map => {
                map.forEach((value, key) => map.set(key, finisher.call(value, value)));
                return map;
            }
        );
    }
    static mapping(mapper, {supplier, accumulator, finisher}) {
        return new Collector(
            supplier,
            (acc, value) => accumulator(acc, mapper.call(value, value)),
            finisher
        );
    }
    static maxBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) < 0 ? value : acc
        );
    }
    static minBy(comparator) {
        return new Collector(
            () => undefined,
            (acc, value) => acc === undefined || comparator(acc, value) > 0 ? value : acc
        );
    }
    static reducing(binaryOperator, identity, mapper=a => a) {
        return new Collector(
            () => identity,
            (acc, value) => acc === undefined ? mapper.call(value, value) : binaryOperator(acc, mapper.call(value, value))
        );
    }
}

class Stream {
    static of(...args) {
        return new Stream(args);
    }
    static fromGenerator(generator, ...args) {
        return new Stream(generator.call(null, ...args));
    }
    static empty() {
        return this.of();
    }
    static Builder = class Builder {
        _items = [];
        // Chainable
        add(item) {
            this.accept(item);
            return this;
        }
        // Other
        accept(item) {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            this._items.push(item);
        }
        build() {
            if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
            let {_items} = this;
            this._items = null;
            return new Stream(_items);
        }
    }
    static builder() {
        return new this.Builder();
    }
    static iterate(value, produceNextFromLast) {
        return this.fromGenerator(function* () {
            yield value;
            while (true) yield value = produceNextFromLast.call(value, value);
        });
    }
    static generate(produceNext) {
        return this.fromGenerator(function* () {
            while (true) yield produceNext();
        });
    }
    static concat(a, b) {
        return this.fromGenerator(function* () {
            yield* a;
            yield* b;
        });
    }
    static range(start, end) {
        return this.fromGenerator(function* () {
            while (start < end) yield start++;
        });
    }
    static rangeClosed(start, last) {
        return this.range(start, last + 1);
    }

    constructor(iterable) {
        this._iterable = iterable;
    }
    // Intermediate (Chainable, pipe) methods
    _chain(generator) { // Private helper method
        return Stream.fromGenerator(generator, this);
    }
    filter(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (predicate.call(item, item)) yield item;
            }
        });
    }
    distinct() {
        const set = new Set;
        return this.filter(item => !set.has(item) && set.add(item));
    }
    map(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield mapper.call(item, item);
        });
    }
    flatMap(mapper) {
        return this._chain(function* (previous) {
            for (const item of previous) yield* mapper.call(item, item);
        });
    }
    peek(action) { // Only for debugging
        return this._chain(function* (previous) {
            for (const item of previous) {
                action.call(item, item);
                yield item;
            }
        });
    }
    sorted(comparator=(a, b) => (a > b) - (a < b)) {
        return this._chain(function* (previous) {
            yield* [...previous].sort(comparator);
        });
    }
    dropWhile(predicate) {
        let active = false;
        return this.filter(item => active ||= !predicate.call(item, item));
    }
    skip(n) {
        return this.dropWhile(() => n > 0 && n--);
    }
    takeWhile(predicate) {
        return this._chain(function* (previous) {
            for (const item of previous) {
                if (!predicate.call(item, item)) break;
                yield item;
            }
        });
    }
    limit(maxSize) {
        return this.takeWhile(() => maxSize > 0 && maxSize--);
    }
    // Terminal operations below: these do not return a Stream
    *[Symbol.iterator]() {  // Use JS symbol convention instead of "iterator" method
        const iterable = this._iterable;
        this.close();
        yield* iterable;
    }
    close() {
        if (!this._iterable) throw TypeError("stream has already been operated on or closed");
        this._iterable = null;
    }
    forEach(callback) {
        for (const item of this) callback.call(item, item);
    }
    toArray() {
        return [...this];
    }
    findFirst() {
        for (const item of this) return item;
    }
    allMatch(predicate) {
        for (const item of this) {
            if (!predicate.call(item, item)) return false;
        }
        return true;
    }
    anyMatch(predicate) {
        for (const item of this) {
            if (predicate.call(item, item)) return true;
        }
        return false;
    }
    noneMatch(predicate) {
        return !this.anyMatch(predicate);
    }
    collect(supplier, accumulator, finisher=a => a) {
        // Can be called with Collector instance as first argument
        if (arguments.length === 1) {
            ({supplier, accumulator, finisher} = supplier);
        }
        const reduced = this.reduce(accumulator, supplier());
        return finisher.call(reduced, reduced);
    }
    reduce(accumulator, initialValue) {  // interface like Array#reduce
        let done, result = initialValue;
        const iterator = this[Symbol.iterator]();
        if (arguments.length == 1) {
            ({done, value: result} = iterator.next());
            if (done) throw new TypeError("reduce of empty stream without initial value");
        }
        for (const item of iterator) {
            result = accumulator(result, item);
        }
        return result;
    }
    count() {
        return this.reduce(count => count + 1, 0);
    }
    max(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? b : a);
    }
    min(comparator=(a, b) => (a > b) - (a < b)) {
        return this.reduce((a, b) => comparator(a, b) < 0 ? a : b);
    }
    sum() { // Will sum numbers or concatenate strings
        return this.reduce((a, b) => a + b, 0);
    }
    average() {
        return this.reduce(([count, sum], b) => [count + 1, sum + b], [0, 0])
                   .reduce((count, sum) => sum / count);
    }
}

// Some example uses....

const first = Stream.iterate(1, i => i + 1)
                 .flatMap(i => Stream.iterate(i, j => j + 100).limit(2))
                 .limit(4);

const second  = Stream.builder().add(9).add(8).add(7).build().peek(console.log);

console.log("concat", Stream.concat(first, second).toArray());
console.log("average", Stream.range(1, 10).average());
console.log("sum", Stream.range(1, 10).sum());
console.log("random", Stream.generate(Math.random).limit(10).toArray());
console.log("skip & limit", Stream.range(1, 10).skip(4).limit(4).toArray());
console.log("first", Stream.range(1, 10).findFirst());
console.log("min, max", Stream.of(..."fabulous").min(), Stream.of(..."fabulous").max());
console.log("count", Stream.range(1, 10).count());
console.log("distinct and sorted", Stream.of(1, 5, 1, 2, 4, 2).distinct().sorted().toArray());

class Person {
    constructor(name, department, salary) {
        this.name = name;
        this.department = department;
        this.salary = salary;
    }
    getName() { return this.name }
    getDepartment() { return this.department }
    getSalary() { return this.salary }
    toString() { return `Hi ${this.name}!` }
}

let people = [
    new Person("John", "reception", 1000), 
    new Person("Mary", "stocks", 1500),
    new Person("Bob", "stocks", 1400),
];
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toArray()));
console.log(Stream.of(...people)
                  .map(Person.prototype.getName)
                  .collect(Collector.toSet()));
console.log(Stream.of(...people)
                  .collect(Collector.joining(", ")));
console.log(Stream.of(...people)
                  .collect(Collector.summing(Person.prototype.getSalary)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment)));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(Person.prototype.getDepartment,
                                                   Collector.averaging(Person.prototype.getSalary))));
console.log(...Stream.of(...people)
                     .collect(Collector.groupingBy(person => person.getSalary() >= 1300)));

// Fibonnacci
console.log(Stream.iterate([0, 1], ([a, b]) => [b, a+b]) 
                  .map(([a]) => a)
                  .takeWhile(a => a < 30)
                  .dropWhile(a => a < 2)
                  .toArray());

// Accumulate object keys
console.log(Stream.range(0, 10).collect(Object, (acc, a) => Object.assign(acc, {[a]: 1}))); 

// Build complete binary tree in breadth-first order
console.log(Stream.iterate(0, i => i + 1)
    .limit(10)
    .collect(
        () => (root => [[root], root])({ left: "x" }), 
        (acc, value) => {
            let [level] = acc;
            level.push(level[0].left ? (level.shift().right = {value}) : (level[0].left = {value}))
            return acc;
        },
        acc => acc.pop().right
    )
);
like image 2
trincot Avatar answered Oct 20 '22 11:10

trincot