Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService's surprising performance break-even point --- rules of thumb?

I'm trying to figure out how to correctly use Java's Executors. I realize submitting tasks to an ExecutorService has its own overhead. However, I'm surprised to see it is as high as it is.

My program needs to process huge amount of data (stock market data) with as low latency as possible. Most of the calculations are fairly simple arithmetic operations.

I tried to test something very simple: "Math.random() * Math.random()"

The simplest test runs this computation in a simple loop. The second test does the same computation inside a anonymous Runnable (this is supposed to measure the cost of creating new objects). The third test passes the Runnable to an ExecutorService (this measures the cost of introducing executors).

I ran the tests on my dinky laptop (2 cpus, 1.5 gig ram):

(in milliseconds)
simpleCompuation:47
computationWithObjCreation:62
computationWithObjCreationAndExecutors:422

(about once out of four runs, the first two numbers end up being equal)

Notice that executors take far, far more time than executing on a single thread. The numbers were about the same for thread pool sizes between 1 and 8.

Question: Am I missing something obvious or are these results expected? These results tell me that any task I pass in to an executor must do some non-trivial computation. If I am processing millions of messages, and I need to perform very simple (and cheap) transformations on each message, I still may not be able to use executors...trying to spread computations across multiple CPUs might end up being costlier than just doing them in a single thread. The design decision becomes much more complex than I had originally thought. Any thoughts?


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

 private static int count = 100000;

 public static void main(String[] args) throws InterruptedException {

  //warmup
  simpleCompuation();
  computationWithObjCreation();
  computationWithObjCreationAndExecutors();

  long start = System.currentTimeMillis();
  simpleCompuation();
  long stop = System.currentTimeMillis();
  System.out.println("simpleCompuation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreation();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreation:"+(stop-start));

  start = System.currentTimeMillis();
  computationWithObjCreationAndExecutors();
  stop = System.currentTimeMillis();
  System.out.println("computationWithObjCreationAndExecutors:"+(stop-start));


 }

 private static void computationWithObjCreation() {
  for(int i=0;i<count;i++){
   new Runnable(){

    @Override
    public void run() {
     double x = Math.random()*Math.random();
    }

   }.run();
  }

 }

 private static void simpleCompuation() {
  for(int i=0;i<count;i++){
   double x = Math.random()*Math.random();
  }

 }

 private static void computationWithObjCreationAndExecutors()
   throws InterruptedException {

  ExecutorService es = Executors.newFixedThreadPool(1);
  for(int i=0;i<count;i++){
   es.submit(new Runnable() {
    @Override
    public void run() {
     double x = Math.random()*Math.random();     
    }
   });
  }
  es.shutdown();
  es.awaitTermination(10, TimeUnit.SECONDS);
 }
}
like image 472
Shahbaz Avatar asked Oct 30 '09 04:10

Shahbaz


3 Answers

  1. Using executors is about utilizing CPUs and / or CPU cores, so if you create a thread pool that utilizes the amount of CPUs at best, you have to have as many threads as CPUs / cores.
  2. You are right, creating new objects costs too much. So one way to reduce the expenses is to use batches. If you know the kind and amount of computations to do, you create batches. So think about thousand(s) computations done in one executed task. You create batches for each thread. As soon as the computation is done (java.util.concurrent.Future), you create the next batch. Even the creation of new batches can be done in parralel (4 CPUs -> 3 threads for computation, 1 thread for batch provisioning). In the end, you may end up with more throughput, but with higher memory demands (batches, provisioning).

Edit: I changed your example and I let it run on my little dual-core x200 laptop.

provisioned 2 batches to be executed
simpleCompuation:14
computationWithObjCreation:17
computationWithObjCreationAndExecutors:9

As you see in the source code, I took the batch provisioning and executor lifecycle out of the measurement, too. That's more fair compared to the other two methods.

See the results by yourself...

import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecServicePerformance {

    private static int count = 100000;

    public static void main( String[] args ) throws InterruptedException {

        final int cpus = Runtime.getRuntime().availableProcessors();

        final ExecutorService es = Executors.newFixedThreadPool( cpus );

        final Vector< Batch > batches = new Vector< Batch >( cpus );

        final int batchComputations = count / cpus;

        for ( int i = 0; i < cpus; i++ ) {
            batches.add( new Batch( batchComputations ) );
        }

        System.out.println( "provisioned " + cpus + " batches to be executed" );

        // warmup
        simpleCompuation();
        computationWithObjCreation();
        computationWithObjCreationAndExecutors( es, batches );

        long start = System.currentTimeMillis();
        simpleCompuation();
        long stop = System.currentTimeMillis();
        System.out.println( "simpleCompuation:" + ( stop - start ) );

        start = System.currentTimeMillis();
        computationWithObjCreation();
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreation:" + ( stop - start ) );

        // Executor

        start = System.currentTimeMillis();
        computationWithObjCreationAndExecutors( es, batches );    
        es.shutdown();
        es.awaitTermination( 10, TimeUnit.SECONDS );
        // Note: Executor#shutdown() and Executor#awaitTermination() requires
        // some extra time. But the result should still be clear.
        stop = System.currentTimeMillis();
        System.out.println( "computationWithObjCreationAndExecutors:"
                + ( stop - start ) );
    }

    private static void computationWithObjCreation() {

        for ( int i = 0; i < count; i++ ) {
            new Runnable() {

                @Override
                public void run() {

                    double x = Math.random() * Math.random();
                }

            }.run();
        }

    }

    private static void simpleCompuation() {

        for ( int i = 0; i < count; i++ ) {
            double x = Math.random() * Math.random();
        }

    }

    private static void computationWithObjCreationAndExecutors(
            ExecutorService es, List< Batch > batches )
            throws InterruptedException {

        for ( Batch batch : batches ) {
            es.submit( batch );
        }

    }

    private static class Batch implements Runnable {

        private final int computations;

        public Batch( final int computations ) {

            this.computations = computations;
        }

        @Override
        public void run() {

            int countdown = computations;
            while ( countdown-- > -1 ) {
                double x = Math.random() * Math.random();
            }
        }
    }
}
like image 68
cafebabe Avatar answered Oct 21 '22 19:10

cafebabe


This is not a fair test for the thread pool for following reasons,

  1. You are not taking advantage of the pooling at all because you only have 1 thread.
  2. The job is too simple that the pooling overhead can't be justified. A multiplication on a CPU with FPP only takes a few cycles.

Considering following extra steps the thread pool has to do besides object creation and the running the job,

  1. Put the job in the queue
  2. Remove the job from queue
  3. Get the thread from the pool and execute the job
  4. Return the thread to the pool

When you have a real job and multiple threads, the benefit of the thread pool will be apparent.

like image 39
ZZ Coder Avatar answered Oct 21 '22 17:10

ZZ Coder


The 'overhead' you mention is nothing to do with ExecutorService, it is caused by multiple threads synchronizing on Math.random, creating lock contention.

So yes, you are missing something (and the 'correct' answer below is not actually correct).

Here is some Java 8 code to demonstrate 8 threads running a simple function in which there is no lock contention:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.DoubleFunction;

import com.google.common.base.Stopwatch;

public class ExecServicePerformance {

    private static final int repetitions = 120;
    private static int totalOperations = 250000;
    private static final int cpus = 8;
    private static final List<Batch> batches = batches(cpus);

    private static DoubleFunction<Double> performanceFunc = (double i) -> {return Math.sin(i * 100000 / Math.PI); };

    public static void main( String[] args ) throws InterruptedException {

        printExecutionTime("Synchronous", ExecServicePerformance::synchronous);
        printExecutionTime("Synchronous batches", ExecServicePerformance::synchronousBatches);
        printExecutionTime("Thread per batch", ExecServicePerformance::asynchronousBatches);
        printExecutionTime("Executor pool", ExecServicePerformance::executorPool);

    }

    private static void printExecutionTime(String msg, Runnable f) throws InterruptedException {
        long time = 0;
        for (int i = 0; i < repetitions; i++) {
            Stopwatch stopwatch = Stopwatch.createStarted();
            f.run(); //remember, this is a single-threaded synchronous execution since there is no explicit new thread
            time += stopwatch.elapsed(TimeUnit.MILLISECONDS);
        }
        System.out.println(msg + " exec time: " + time);
    }    

    private static void synchronous() {
        for ( int i = 0; i < totalOperations; i++ ) {
            performanceFunc.apply(i);
        }
    }

    private static void synchronousBatches() {      
        for ( Batch batch : batches) {
            batch.synchronously();
        }
    }

    private static void asynchronousBatches() {

        CountDownLatch cb = new CountDownLatch(cpus);

        for ( Batch batch : batches) {
            Runnable r = () ->  { batch.synchronously(); cb.countDown(); };
            Thread t = new Thread(r);
            t.start();
        }

        try {
            cb.await();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }        
    }

    private static void executorPool() {

        final ExecutorService es = Executors.newFixedThreadPool(cpus);

        for ( Batch batch : batches ) {
            Runnable r = () ->  { batch.synchronously(); };
            es.submit(r);
        }

        es.shutdown();

        try {
            es.awaitTermination( 10, TimeUnit.SECONDS );
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } 

    }

    private static List<Batch> batches(final int cpus) {
        List<Batch> list = new ArrayList<Batch>();
        for ( int i = 0; i < cpus; i++ ) {
            list.add( new Batch( totalOperations / cpus ) );
        }
        System.out.println("Batches: " + list.size());
        return list;
    }

    private static class Batch {

        private final int operationsInBatch;

        public Batch( final int ops ) {
            this.operationsInBatch = ops;
        }

        public void synchronously() {
            for ( int i = 0; i < operationsInBatch; i++ ) {
                performanceFunc.apply(i);
            }
        }
    }


}

Result timings for 120 tests of 25k operations (ms):

  • Synchronous exec time: 9956
  • Synchronous batches exec time: 9900
  • Thread per batch exec time: 2176
  • Executor pool exec time: 1922

Winner: Executor Service.

like image 6
adrianos Avatar answered Oct 21 '22 19:10

adrianos