Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Java multithreading, what is the most efficient to coordinate finding the best result?

Let me be clear, the method I describe below is operational. I'm hoping to improve the throughput of the method. It works, and it works quite well. We're looking to scale throughput even more which is why I'm looking into this.

The task at hand is to improve the performance of a scoring algorithm which returns the best score for a collection of tasks. I have collection of tasks that I perform the scoring on using an ExecutorService. Each task checks to see if it now has the best score, and updates the best score in a synchronized fashion if it is the new best. To give some insight into the scale I'm working at, Each task takes a fraction of a millisecond to complete, but there are thousands of them, resulting in several hundred milliseconds to find the best one. I execute this scoring algorithm several hundred times a minute. The result is that 30 second out of 60 is spent running this scoring algorithm.

When my thread pool is 8 threads (with 24 virtual cores), the tasks take 0.3 ms each. When I have 20 threads (same machine, 24 virtual cores) the tasks take 0.6 ms each. I suspect that as I add more threads to my ExecutorService thread pool that my performance is getting worse because of this synchronization on the best score (more threads contending for the lock).

I have done quite a bit of searching, but can't seem to find satisfactory (actually, I can't seem to find any) alternatives. I'm thinking about collecting all scores and either storing in sorted order, or sorting after all tasks are completed--but I'm unsure if this will be any improvement.

Does anyone have any thoughts of another, more efficient way of collecting the best score?

Here's the current methodology:

final double[] bestScore = { Double.MAX_VALUE };
// for each item in the collection {
    tasks.add(Executors.callable(new Runnable() {
        public void run() {
            double score = //... do the scoring for the task
            if (score < bestScore[0]) {
                synchronized(bestScore) {
                    if (score < bestScore[0]) { // check again after we have the lock
                        bestScore[0] = score;
                        ...
                        // also save off other task identifiers in a similar fashion
                    }
                }
            }
        }
    }
} // end of loop creating scoring tasks

List<Future<Object>> futures = executorService.invokeAll(tasks /*...timeout params here*/);
... // handle cancelled tasks 

// now use the best scoring task that was saved off when it was found.
like image 341
jadz Avatar asked May 15 '15 03:05

jadz


3 Answers

Lets say that you have 10k scores and you need to find the best score between all of them.Take your 10k scores and divide them up by the number of threads, so lets say you want 10 threads then each thread will get 1000.

Now each thread can find the max out of its 1000 completely in parallel. When all 10 results are returned you will just need to find the max out of those 10 to get the overall max.

like image 28
EricF Avatar answered Sep 28 '22 14:09

EricF


I'll have to take for granted the fact that you want to compute each individual score as a separate task submitted to an ExecutorService. There must be other benefits, otherwise the overhead isn't worth it. Normally, you'd implement a Callable that returns the score (or an object with the score and other pertinent results) when executed. After successful invocation of all tasks, all results would be examined in the main thread to obtain the best.

Given your constraints, however, one optimization you could try is using a DoubleAccumulator, which was intended for cases like these, instead of your one-element array and synchronization. It would look something like this:

final DoubleAccumulator lowest = new DoubleAccumulator(Math::min, Double.POSITIVE_INFINITY);
/* Loop, creating all the tasks... */
for ( ... ) {
  tasks.add(Executors.callable(new Runnable() {
    public void run()
    {
      double score = 0; /* Compute a real score here. */
      lowest.accumulate(score);
    }
  }));
}
/* Invoke all the tasks, when successful... */
double lowestScore = lowest.get();

If you need to track information besides the score, you can do something similar with AtomicReference, creating a data object that carries the task identifier, score, and any other needed properties, and using one of its accumulators.

If your tasks are initialized by some sort of recursive, divide-and-conquer approach, resulting in non-blocking, equally-sized tasks, the fork-join framework underlying a parallel Stream might be a good fit too.

Again, though, I would point out that if more threads decreased performance, measuring the use of even fewer threads seems prudent.

like image 52
erickson Avatar answered Sep 28 '22 14:09

erickson


I have few concerns. In you code you have only one element in bestScore then why do you need an array? Why have you set its value to max allowed value for double? In that case it will always be the best score, Nope?

Also it seems you need to make sure all your tasks execute because only then you will be able to know the best score among the tasks. I recommend to create a new Callable for each of these tasks that compute a score, something like:

public class ScoreComputer implements Callable<Double> {
    @Override
    public Double call() throws Exception {
        double score = 0;
        //Compute and return score here.
        return score;
    }
}

Then for each task submit a ScoreComputer which will return a Future<Double> which will have result once computation is over. Then you can find max value out of all the computed results and compare it with your existing best score.

public static void main(String[] args) throws ExecutionException, InterruptedException {
        double bestScore = Double.MAX_VALUE;
        List<Future<Double>> futures = new ArrayList<>();
        //For each item in collection create a task and set it to run.
        ExecutorService service = Executors.newCachedThreadPool();
        futures.add(service.submit(new ScoreComputer()));

        List<Double> scores = new ArrayList<>();

        for(Future<Double> future : futures) {
            scores.add(future.get());
        }

        Double bestScoreInTasks = Collections.min(scores);
        if(bestScore < bestScoreInTasks) {
            bestScore = bestScoreInTasks;
        }
        System.out.println(bestScore);
    }

I believe this will provide you some idea. Also your tasks are very short duration then IMO using a cached pool makes sense here. As per Java Doc:

newCachedThreadPool() creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available. These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks.

like image 42
akhil_mittal Avatar answered Sep 28 '22 16:09

akhil_mittal