I like the Java 8's streaming API. There are plenty of useful intermediate and terminal methods to transform and collect the stream. I'm talking about intermediate methods like distinct()
or terminal methods like collect()
. I find the Collector
API especially useful, to reduce the stream to deep grouping maps.
What is the javascript equivalent for the Java streaming API? I know there're basic functions like map
, filter
and reduce
, but don't find any more generalized interfaces provided by javascript native to query or group the data in collection. Are there some production ready libraries to match the Java Streaming API?
From the api level, lodash/RxJS/stream.js may meet the requirement, but the powerful thing of Java Stream is it can leverage the modern cpu multi core architecture so to parallelize the jobs. However, this is not solved by any of these pure js libraries, at then end of the day, these js are still running in a single threaded runtime, and has 1 core usage at the same time.
I guess the JS engine needs to provide support to achieve the performance target.
java 8 stream() is the same as lodash chain()
java 8 collect() is the same as lodash value()
java 8 distinct() is the same as lodash uniq()
java 8 map() is the same as lodash map()
lodash is more comprehensive, since it has been around longer.
JavaScript has no parallelism, so streams would always be sequential, and a collector would not need a combiner.
I have here tried to mimic the Stream API in JavaScript, but stripped from several features. Still I think it has the key features.
As you focussed on Collectors
, I added a Collector
class with a constructor and static methods which roughly correspond to Java's Collectors
(plural) interface. The Stream
class has a constructor which takes an iterable. It has most of the static methods as in Java, with some variation when it comes to reduce
and iterator
to make it more aligned with JavaScript's practice.
There's a Stream.Builder
class included as well.
Finally, this snippet runs several examples. I think it looks quite familiar if you know the Java API:
class Collector {
constructor(supplier, accumulator, finisher=a => a) {
this.supplier = supplier;
this.accumulator = accumulator;
this.finisher = finisher;
}
static toArray() {
return self._toArray ??= new Collector(Array, (acc, item) => (acc.push(item), acc));
}
static toSet() {
return self._toSet ??= new Collector(() => new Set, (acc, item) => acc.add(item));
}
static toMap(keyMapper, valueMapper) {
return new Collector(() => new Map, (acc, item) => acc.add(keyMapper(item), valueMapper(item)));
}
static toObject(keyMapper, valueMapper) {
return new Collector(Object, (acc, item) => acc[keyMapper(item)] = valueMapper(item));
}
static averaging(mapper=a => a) {
return new Collector(
() => [0, 0],
([sum, count], value) => [sum + mapper.call(value, value), count + 1],
([sum, count]) => sum / count
);
}
static collectingAndThen({supplier, accumulator, finisher}, nextFinisher) {
return new Collector(
supplier,
accumulator,
value => (prev => nextFinisher.call(prev, prev))(finisher.call(value, value))
);
}
static counting() {
return this._counting ??= new Collector(Number, (count, value) => count + 1);
}
static summing(mapper=Number) {
return new Collector(Number, (sum, value) => sum + mapper.call(value, value));
}
static joining(delimiter=",", prefix="", postfix="") {
return this.collectingAndThen(Collector.toArray(), arr => prefix + arr.join(delimiter) + postfix);
}
// No implementation of partitioningBy, as it is the same as groupingBy with a boolean classifier
static groupingBy(classifier, {supplier, accumulator, finisher} = Collector.toArray()) {
return new Collector(
() => new Map,
(map, value) => {
const key = classifier.call(value, value);
let result = map.get(key) ?? supplier();
return map.set(key, accumulator(result, value));
},
map => {
map.forEach((value, key) => map.set(key, finisher.call(value, value)));
return map;
}
);
}
static mapping(mapper, {supplier, accumulator, finisher}) {
return new Collector(
supplier,
(acc, value) => accumulator(acc, mapper.call(value, value)),
finisher
);
}
static maxBy(comparator) {
return new Collector(
() => undefined,
(acc, value) => acc === undefined || comparator(acc, value) < 0 ? value : acc
);
}
static minBy(comparator) {
return new Collector(
() => undefined,
(acc, value) => acc === undefined || comparator(acc, value) > 0 ? value : acc
);
}
static reducing(binaryOperator, identity, mapper=a => a) {
return new Collector(
() => identity,
(acc, value) => acc === undefined ? mapper.call(value, value) : binaryOperator(acc, mapper.call(value, value))
);
}
}
class Stream {
static of(...args) {
return new Stream(args);
}
static fromGenerator(generator, ...args) {
return new Stream(generator.call(null, ...args));
}
static empty() {
return this.of();
}
static Builder = class Builder {
_items = [];
// Chainable
add(item) {
this.accept(item);
return this;
}
// Other
accept(item) {
if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
this._items.push(item);
}
build() {
if (!this._items) throw new ValueError("The builder has already transitioned to the built state");
let {_items} = this;
this._items = null;
return new Stream(_items);
}
}
static builder() {
return new this.Builder();
}
static iterate(value, produceNextFromLast) {
return this.fromGenerator(function* () {
yield value;
while (true) yield value = produceNextFromLast.call(value, value);
});
}
static generate(produceNext) {
return this.fromGenerator(function* () {
while (true) yield produceNext();
});
}
static concat(a, b) {
return this.fromGenerator(function* () {
yield* a;
yield* b;
});
}
static range(start, end) {
return this.fromGenerator(function* () {
while (start < end) yield start++;
});
}
static rangeClosed(start, last) {
return this.range(start, last + 1);
}
constructor(iterable) {
this._iterable = iterable;
}
// Intermediate (Chainable, pipe) methods
_chain(generator) { // Private helper method
return Stream.fromGenerator(generator, this);
}
filter(predicate) {
return this._chain(function* (previous) {
for (const item of previous) {
if (predicate.call(item, item)) yield item;
}
});
}
distinct() {
const set = new Set;
return this.filter(item => !set.has(item) && set.add(item));
}
map(mapper) {
return this._chain(function* (previous) {
for (const item of previous) yield mapper.call(item, item);
});
}
flatMap(mapper) {
return this._chain(function* (previous) {
for (const item of previous) yield* mapper.call(item, item);
});
}
peek(action) { // Only for debugging
return this._chain(function* (previous) {
for (const item of previous) {
action.call(item, item);
yield item;
}
});
}
sorted(comparator=(a, b) => (a > b) - (a < b)) {
return this._chain(function* (previous) {
yield* [...previous].sort(comparator);
});
}
dropWhile(predicate) {
let active = false;
return this.filter(item => active ||= !predicate.call(item, item));
}
skip(n) {
return this.dropWhile(() => n > 0 && n--);
}
takeWhile(predicate) {
return this._chain(function* (previous) {
for (const item of previous) {
if (!predicate.call(item, item)) break;
yield item;
}
});
}
limit(maxSize) {
return this.takeWhile(() => maxSize > 0 && maxSize--);
}
// Terminal operations below: these do not return a Stream
*[Symbol.iterator]() { // Use JS symbol convention instead of "iterator" method
const iterable = this._iterable;
this.close();
yield* iterable;
}
close() {
if (!this._iterable) throw TypeError("stream has already been operated on or closed");
this._iterable = null;
}
forEach(callback) {
for (const item of this) callback.call(item, item);
}
toArray() {
return [...this];
}
findFirst() {
for (const item of this) return item;
}
allMatch(predicate) {
for (const item of this) {
if (!predicate.call(item, item)) return false;
}
return true;
}
anyMatch(predicate) {
for (const item of this) {
if (predicate.call(item, item)) return true;
}
return false;
}
noneMatch(predicate) {
return !this.anyMatch(predicate);
}
collect(supplier, accumulator, finisher=a => a) {
// Can be called with Collector instance as first argument
if (arguments.length === 1) {
({supplier, accumulator, finisher} = supplier);
}
const reduced = this.reduce(accumulator, supplier());
return finisher.call(reduced, reduced);
}
reduce(accumulator, initialValue) { // interface like Array#reduce
let done, result = initialValue;
const iterator = this[Symbol.iterator]();
if (arguments.length == 1) {
({done, value: result} = iterator.next());
if (done) throw new TypeError("reduce of empty stream without initial value");
}
for (const item of iterator) {
result = accumulator(result, item);
}
return result;
}
count() {
return this.reduce(count => count + 1, 0);
}
max(comparator=(a, b) => (a > b) - (a < b)) {
return this.reduce((a, b) => comparator(a, b) < 0 ? b : a);
}
min(comparator=(a, b) => (a > b) - (a < b)) {
return this.reduce((a, b) => comparator(a, b) < 0 ? a : b);
}
sum() { // Will sum numbers or concatenate strings
return this.reduce((a, b) => a + b, 0);
}
average() {
return this.reduce(([count, sum], b) => [count + 1, sum + b], [0, 0])
.reduce((count, sum) => sum / count);
}
}
// Some example uses....
const first = Stream.iterate(1, i => i + 1)
.flatMap(i => Stream.iterate(i, j => j + 100).limit(2))
.limit(4);
const second = Stream.builder().add(9).add(8).add(7).build().peek(console.log);
console.log("concat", Stream.concat(first, second).toArray());
console.log("average", Stream.range(1, 10).average());
console.log("sum", Stream.range(1, 10).sum());
console.log("random", Stream.generate(Math.random).limit(10).toArray());
console.log("skip & limit", Stream.range(1, 10).skip(4).limit(4).toArray());
console.log("first", Stream.range(1, 10).findFirst());
console.log("min, max", Stream.of(..."fabulous").min(), Stream.of(..."fabulous").max());
console.log("count", Stream.range(1, 10).count());
console.log("distinct and sorted", Stream.of(1, 5, 1, 2, 4, 2).distinct().sorted().toArray());
class Person {
constructor(name, department, salary) {
this.name = name;
this.department = department;
this.salary = salary;
}
getName() { return this.name }
getDepartment() { return this.department }
getSalary() { return this.salary }
toString() { return `Hi ${this.name}!` }
}
let people = [
new Person("John", "reception", 1000),
new Person("Mary", "stocks", 1500),
new Person("Bob", "stocks", 1400),
];
console.log(Stream.of(...people)
.map(Person.prototype.getName)
.collect(Collector.toArray()));
console.log(Stream.of(...people)
.map(Person.prototype.getName)
.collect(Collector.toSet()));
console.log(Stream.of(...people)
.collect(Collector.joining(", ")));
console.log(Stream.of(...people)
.collect(Collector.summing(Person.prototype.getSalary)));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(Person.prototype.getDepartment)));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(Person.prototype.getDepartment,
Collector.averaging(Person.prototype.getSalary))));
console.log(...Stream.of(...people)
.collect(Collector.groupingBy(person => person.getSalary() >= 1300)));
// Fibonnacci
console.log(Stream.iterate([0, 1], ([a, b]) => [b, a+b])
.map(([a]) => a)
.takeWhile(a => a < 30)
.dropWhile(a => a < 2)
.toArray());
// Accumulate object keys
console.log(Stream.range(0, 10).collect(Object, (acc, a) => Object.assign(acc, {[a]: 1})));
// Build complete binary tree in breadth-first order
console.log(Stream.iterate(0, i => i + 1)
.limit(10)
.collect(
() => (root => [[root], root])({ left: "x" }),
(acc, value) => {
let [level] = acc;
level.push(level[0].left ? (level.shift().right = {value}) : (level[0].left = {value}))
return acc;
},
acc => acc.pop().right
)
);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With