So as it goes in the current scenario, we have a set of APIs as listed below:
Consumer<T> start();
Consumer<T> performDailyAggregates();
Consumer<T> performLastNDaysAggregates();
Consumer<T> repopulateScores();
Consumer<T> updateDataStore();
Over these, one of our schedulers performs the tasks e.g.
private void performAllTasks(T data) {
start().andThen(performDailyAggregates())
.andThen(performLastNDaysAggregates())
.andThen(repopulateScores())
.andThen(updateDataStore())
.accept(data);
}
While reviewing this, I thought of moving to a more flexible implementation 1 of performing tasks which would look like:
// NOOP in the context further stands for 'anything -> {}'
private void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
consumerList.reduce(NOOP, Consumer::andThen).accept(data);
}
The point that strikes my mind now is that the Javadoc clearly states that
accumulator
- an associative, non-interfering, stateless function for combining two values
Next up I was thinking How to ensure order of processing in java8 streams? to be ordered (processing order to be same as encounter order)!
Okay, the stream generated out of a List
would be ordered and unless the stream is made parallel
before reduce
the following implementation shall work. 2
private void performAllTasks(List<Consumer<T>> consumerList, T data) {
consumerList.stream().reduce(NOOP, Consumer::andThen).accept(data);
}
Q. Does this assumption 2 hold true? Would it be guaranteed to always execute the consumers in the order that the original code had them?
Q. Is there a possibility somehow to expose 1 as well to the callees to perform tasks?
If you have an ordered stream and perform operations which guarantee to maintain the order, it doesn't matter whether the stream is processed in parallel or sequential; the implementation will maintain the order. The ordered property is distinct from parallel vs sequential.
Encounter order If we have an ordered source and perform intermediate and terminal operations which guarantee to maintain the order, it does not matter whether the stream is processed in parallel or sequential, the order will be maintained.
If our Stream is ordered, it doesn't matter whether our data is being processed sequentially or in parallel; the implementation will maintain the encounter order of the Stream.
Collectors preserve encounter order if they are accumulating into a container that itself has an encounter order. For example, collection into an array will place the elements into the array in the encounter order of the stream. This is an example of a terminal operation that requires an ordering on its input.
As Andreas pointed out, Consumer::andThen
is an associative function and while the resulting consumer may have a different internal structure, it is still equivalent.
But let's debug it
public static void main(String[] args) {
performAllTasks(IntStream.range(0, 10)
.mapToObj(i -> new DebuggableConsumer(""+i)), new Object());
}
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}
static class DebuggableConsumer implements Consumer<Object> {
private final Consumer<Object> first, second;
private final boolean leaf;
DebuggableConsumer(String name) {
this(x -> System.out.println(name), x -> {}, true);
}
DebuggableConsumer(Consumer<Object> a, Consumer<Object> b, boolean l) {
first = a; second = b;
leaf = l;
}
public void accept(Object t) {
first.accept(t);
second.accept(t);
}
@Override public Consumer<Object> andThen(Consumer<? super Object> after) {
return new DebuggableConsumer(this, after, false);
}
public @Override String toString() {
if(leaf) return first.toString();
return toString(new StringBuilder(200), 0, 0).toString();
}
private StringBuilder toString(StringBuilder sb, int preS, int preEnd) {
int myHandle = sb.length()-2;
sb.append(leaf? first: "combined").append('\n');
if(!leaf) {
int nPreS=sb.length();
((DebuggableConsumer)first).toString(
sb.append(sb, preS, preEnd).append("\u2502 "), nPreS, sb.length());
nPreS=sb.length();
sb.append(sb, preS, preEnd);
int lastItemHandle=sb.length();
((DebuggableConsumer)second).toString(sb.append(" "), nPreS, sb.length());
sb.setCharAt(lastItemHandle, '\u2514');
}
if(myHandle>0) {
sb.setCharAt(myHandle, '\u251c');
sb.setCharAt(myHandle+1, '\u2500');
}
return sb;
}
}
will print
0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─combined
│ │ │ ├─combined
│ │ │ │ ├─combined
│ │ │ │ │ ├─combined
│ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ ├─combined
│ │ │ │ │ │ │ │ ├─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@378fd1ac
│ │ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@49097b5d
│ │ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6e2c634b
│ │ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@37a71e93
│ │ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7e6cbb7a
│ │ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7c3df479
│ │ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7106e68e
│ │ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@7eda2dbb
│ └─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@6576fe71
└─SO$DebuggableConsumer$$Lambda$21/0x0000000840069040@76fb509a
whereas changing the reduction code to
private static <T> void performAllTasks(Stream<Consumer<T>> consumerList, T data) {
Consumer<T> reduced = consumerList.parallel().reduce(Consumer::andThen).orElse(x -> {});
reduced.accept(data);
System.out.println(reduced);
}
prints on my machine
0
1
2
3
4
5
6
7
8
9
combined
├─combined
│ ├─combined
│ │ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@49097b5d
│ │ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6e2c634b
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@37a71e93
│ └─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7e6cbb7a
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7c3df479
└─combined
├─combined
│ ├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7106e68e
│ └─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@7eda2dbb
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@6576fe71
└─combined
├─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@76fb509a
└─SO$DebuggableConsumer$$Lambda$22/0x0000000840077c40@300ffa5d
illustrating the point of Andreas’ answer, but also highlighting an entirely different problem. You may max it out by using, e.g. IntStream.range(0, 100)
in the example code.
The result of the parallel evaluation is actually better than the sequential evaluation, as the sequential evaluation creates an unbalanced tree. When accepting an arbitrary stream of consumers, this can be an actual performance issue or even lead to a StackOverflowError
when trying to evaluate the resulting consumer.
For any nontrivial number of consumers, you actually want a balanced consumer tree, but using a parallel stream for that is not the right solution, as a) Consumer::andThen
is a cheap operation with no real benefit from parallel evaluation and b) the balancing would depend on unrelated properties, like the nature of the stream source and the number of CPU cores, which determine when the reduction falls back to the sequential algorithm.
Of course, the simplest solution would be
private static <T> void performAllTasks(Stream<Consumer<T>> consumers, T data) {
consumers.forEachOrdered(c -> c.accept(data));
}
But when you want to construct a compound Consumer
for re-using, you may use
private static final int ITERATION_THRESHOLD = 16; // tune yourself
public static <T> Consumer<T> combineAllTasks(Stream<Consumer<T>> consumers) {
List<Consumer<T>> consumerList = consumers.collect(Collectors.toList());
if(consumerList.isEmpty()) return t -> {};
if(consumerList.size() == 1) return consumerList.get(0);
if(consumerList.size() < ITERATION_THRESHOLD)
return balancedReduce(consumerList, Consumer::andThen, 0, consumerList.size());
return t -> consumerList.forEach(c -> c.accept(t));
}
private static <T> T balancedReduce(List<T> l, BinaryOperator<T> f, int start, int end) {
if(end-start>2) {
int mid=(start+end)>>>1;
return f.apply(balancedReduce(l, f, start, mid), balancedReduce(l, f, mid, end));
}
T t = l.get(start++);
if(start<end) t = f.apply(t, l.get(start));
assert start==end || start+1==end;
return t;
}
The code will provide a single Consumer
just using a loop when the number of consumers exceeds a threshold. This is the simplest and most efficient solution for a larger number of consumers and in fact, you could drop all other approaches for the smaller numbers and still get a reasonable performance…
Note that this still doesn’t hinder parallel processing of the stream of consumers, if their construction really benefits from it.
Even if the Stream<Consumer<T>>
is made parallel, the resulting compound Consumer
will execute the individual consumers in order, assuming:
The Stream
is ordered.
A stream sourced by a List
is ordered, even with parallel enabled.
The accumulator
passed to reduce()
is associative.Consumer::andThen
is associative.
Let's say you have a list of 4 consumers [A, B, C, D]
. Normally, without parallel, the following would happen:
x = A.andThen(B);
x = x.andThen(C);
compound = x.andThen(D);
so that calling compound.apply()
would call A
, B
, C
, then D
in that order.
If you enable parallel, the stream framework might instead split that to be processed by 2 threads, [A, B]
by thread 1, and [C, D]
by thread 2.
That means the following will happen:
x = A.andThen(B);
y = C.andThen(D);
compound = x.andThen(y);
The result is that x
is applied first, which means A
then B
, then y
is applied, which means C
then D
.
So although the compound consumer is built like [[A, B], [C, D]]
instead of the left-associative [[[A, B], C], D]
, the 4 consumers are executed in order, all because Consumer::andThen
is associative.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With