Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java parallel streams: there's a way to navigate a binary tree?

I'm struggling to find a proper way to get a speedup from this stream:

    StreamSupport.stream(new BinaryTreeSpliterator(root), true)
                .parallel()
                .map(node -> processor.onerousFunction(node.getValue()))
                .mapToInt(i -> i.intValue())
                .sum()

onerousFunction() is just a function that makes the thread work for a bit and returns the int value of the node.

No matter how many cpus i use, the execution time always remains the same. I think the problem stands in the Spliterator i wrote:

    public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {

        private LinkedBlockingQueue<Node> nodes = new LinkedBlockingQueue<>();

        public BinaryTreeSpliterator(Node root) {
            super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
            this.nodes.add(root);
        }

        @Override
         public boolean tryAdvance(Consumer<? super Node> action) {
            Node current = this.nodes.poll();
            if(current != null) {
                action.accept(current);
                if(current.getLeft() != null) 
                    this.nodes.offer(current.getLeft());
                if(current.getRight() != null)
                    this.nodes.offer(current.getRight());
                return true;
            }
            return false;
        }

    }

But i really can't find a good solution.

like image 399
Stefano Silvi Avatar asked Jan 08 '18 10:01

Stefano Silvi


People also ask

What is the disadvantage of parallel stream in Java 8?

Parallel Streams can actually slow you down It breaks them into subproblems which then run on separate threads for processing, these can go to different cores and then get combined when they're done. This all happens under the hood using the fork/join framework.

What does parallel stream do in Java?

Usually, any Java code that has only one processing stream, where it is sequentially executed. However, by using parallel streams, one can separate the Java code into more than one stream, which is executed in parallel on their separate cores, and the end result is the combination of the individual results.

What operations can be executed in parallel with Java parallel streams?

You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams. Aggregate operations iterate over and process these substreams in parallel and then combine the results.


1 Answers

To process data in parallel, you need a trySplit implementation to return partial data as a new Spliterator instance. The spliterator instances are traversed by a single thread each. So you don’t need a thread safe collection within your spliterator, by the way. But your problem is that you are inheriting the trySplit implementation from AbstractSpliterator which does attempt to provide some parallel support despite not knowing anything about your data.

It does so, by requesting some items sequentially, buffering them into an array and returning a new array based spliterator. Unfortunately, it does not handle “unknown size” very well (the same applies to the parallel stream implementation in general). It will buffer 1024 elements by default, buffering even more the next time, if there are as much elements. Even worse, the stream implementation will not use the array based spliterator’s good splitting capabilities, because it treats “unknown size” like the literal Long.MAX_VALUE, concluding that your spliterator has much more elements than the 1024 elements in the array, hence, will not even try to split the array based spliterator.

Your spliterator can implement a much more suitable trySplit method:

public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {
    /**
     * a node that has not been traversed, but its children are only
     * traversed if contained in this.pending
     * (otherwise a different spliterator might be responsible)
     */
    private Node pendingNode;
    /** pending nodes needing full traversal */
    private ArrayDeque<Node> pending = new ArrayDeque<>();

    public BinaryTreeSpliterator(Node root) {
        super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
        push(root);
    }

    private BinaryTreeSpliterator(Node pending, Node next) {
        super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
        pendingNode = pending;
        if(next!=null) this.pending.offer(next);
    }
    private void push(Node n) {
        if(pendingNode == null) {
            pendingNode = n;
            if(n != null) {
                if(n.getRight()!=null) pending.offerFirst(n.getRight());
                if(n.getLeft() !=null) pending.offerFirst(n.getLeft());
            }
        }
        else pending.offerFirst(n);
    }

    @Override
     public boolean tryAdvance(Consumer<? super Node> action) {
        Node current = pendingNode;
        if(current == null) {
            current = pending.poll();
            if(current == null) return false;
            push(current.getRight());
            push(current.getLeft());
        }
        else pendingNode = null;
        action.accept(current);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super Node> action) {
        Node current = pendingNode;
        if(current != null) {
            pendingNode = null;
            action.accept(current);
        }
        for(;;) {
            current = pending.poll();
            if(current == null) break;
            traverseLocal(action, current);
        }
    }
    private void traverseLocal(Consumer<? super Node> action, Node current) {
        do {
            action.accept(current);
            Node child = current.getLeft();
            if(child!=null) traverseLocal(action, child);
            current = current.getRight();
        } while(current != null);
    }

    @Override
    public Spliterator<Node> trySplit() {
        Node next = pending.poll();
        if(next == null) return null;
        if(pending.isEmpty()) {
            pending.offer(next);
            next = null;
        }
        if(pendingNode==null) return next==null? null: new BinaryTreeSpliterator(next);
        Spliterator<Node> s = new BinaryTreeSpliterator(pendingNode, next);
        pendingNode = null;
        return s;
    }
}

Note that this spliterator would also qualify as ORDERED spliterator, maintaining a top-left-right order. An entirely unordered spliterator could be implemented slightly simpler.

You may implement a more efficient forEachRemaining method than the inherited default, e.g.

@Override
public void forEachRemaining(Consumer<? super Node> action) {
    Node current = pendingNode;
    if(current != null) {
        pendingNode = null;
        action.accept(current);
    }
    for(;;) {
        current = pending.poll();
        if(current == null) break;
        traverseLocal(action, current);
    }
}
private void traverseLocal(Consumer<? super Node> action, Node current) {
    do {
        action.accept(current);
        Node child = current.getLeft();
        if(child!=null) traverseLocal(action, child);
        current = current.getRight();
    } while(current != null);
}

but this method might cause stackoverflow errors, if your application has to deal with unbalanced trees (specifically, very long left paths in this example).

like image 85
Holger Avatar answered Sep 25 '22 14:09

Holger