Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Best way to sum concurrently

I am trying to compute some big numbers. In order to speed up computation, I would like to make use of multithreading. Each thread should calculate a number and in the end a sum is calculated.

I once saw something that worked with a SumThread and a Collector that looked as follows:

public BigInteger compute(int p) {
    Collector c = new Collector(p);

    for(T element : Collection<T> bigCollection) {
        new SumThread(c) {

            @Override
            protected void doTheJob() {
                long big = someVeryComplexCalculation(element, ...); //n!
                receive(BigInteger.valueOf(big));
            }

        }
    }

    if(collector.isReady())
        return collector.getResult();

    return null;
}

public class Collector {

    private int numberOfProcesses;
    private int numberOfAllowedProcesses;
    private BigInteger result;

    public Collector(int n) {
        numberOfAllowedProcesses = n;
        numberOfProcesses = 0;
        result = BigInteger.ZERO;
    }

    synchronized public void enter() throws InterruptedException {
        if (numberOfProcesses == numberOfAllowedProcesses) wait();
        numberOfProcesses++;
    }

    synchronized public void leave() {
        numberOfProcesses--;
        notify();
    }

    synchronized public void register(BigInteger v) {
        result = result.add(v);
    }

    synchronized public boolean isReady() throws InterruptedException {
        while (numberOfProcesses > 0) wait();
        return true;
    }

    ...
}

public abstract class SumThread extends Thread {

    private Collector collector;

    public SumThread(Collector c) throws InterruptedException {
        collector = c;
        collector.enter();
    }

    abstract protected void doTheJob(); //complex calculations can be done in here

    public void receive(BigInteger t) {
        collector.register(t);
    }

    public void run() {
        doTheJob();
        collector.leave();
    }
}

I thought I could easily outperform this by using ExecutorService instead of making new Threads constantly like:

public BigInteger compute(int p) {
    ExecutorService pool = Executors.newFixedThreadPool(p);
    Future<BigInteger>[] futures = new Future<BigInteger>[bigCollection.size()];
    int i = 0;

    for(T element : Collection<T> bigCollection) {
        futures[i++] = p.submit(new Callable<BigInteger>() {

            @Override
            public BigInteger call() {
                long big = someVeryComplexCalculation(element, ...); //n!
                return BigInteger.valueOf(big);
            }

        }
    }

    // or with ExecutorCompletionService, but the loop remains I guess
    BigInteger res = BigInteger.ZERO
    for(Future<BigInteger> f : futures)
        res = res.add(f.get());

    return res;
}

This code didn't manage to outperform the SumThread-Collector solution however. I've also seen things about LongAdder for instance, but I would need some adder for BigIntegers...

My question thus is: what is the best way to calculate a sum concurrently? Is it one of the above or is there a completely different (but better) way?

like image 707
Mr Tsjolder Avatar asked Aug 21 '15 07:08

Mr Tsjolder


2 Answers

As you are mentioned LongAdder which was added in Java-8 and use effectively-final variables, I assume that you are using Java-8. In this version the best way to solve your task is to use the Stream API:

BigInteger result = bigCollection.parallelStream()
                     .map(e -> BigInteger.valueOf(someVeryComplexCalculation(e, ...)))
                     .reduce(BigInteger.ZERO, BigInteger::add);

Your problem is the classical map-reduce task where you should transform each element of some collection, then combine the results of the individual transformations into the final result. The Stream API is capable to parallelize such tasks quite effectively without any manual work. In Oracle JDK the tasks are executed in the common ForkJoinPool pool which by default creates as many threads as many CPU cores you have.

like image 132
Tagir Valeev Avatar answered Sep 19 '22 00:09

Tagir Valeev


Tou have two solutions:

In first, I would propose to use Fork-Join Framework from JDK7 for this task:

  • https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
  • http://www.javacodegeeks.com/2013/02/java-7-forkjoin-framework-example.html

You would need to implement an RecursiveTask

As a second solution (JDK8) would be to use pararell stream, just as @tagir-valeev has proposed.

In both cases it depends what you are in for, and what Java version you are using.

like image 20
Beri Avatar answered Sep 21 '22 00:09

Beri