Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

High-performance buffering for a stream of rands

I have code that consumes a large number (millions currently, eventually billions) of relatively short (5-100 elements) arrays of random numbers and does some not-very-strenuous math with them. Random numbers being, well, random, ideally I'd like to generate them on multiple cores, since random number generation is > 50% of my runtime in profiling. However, I'm having difficulty distributing a large number of small tasks in a way that's not slower than the single-threaded approach.

My code currently looks something like this:

for(int i=0;i<1000000;i++){
    for(RealVector d:data){
        while(!converged){
            double[] shortVec = new double[5];
            for(int i=0;i<5;i++) shortVec[i]=rng.nextGaussian();
            double[] longerVec = new double[50];
            for(int i=0;i<50;i++) longerVec[i]=rng.nextGaussian();
            /*Do some relatively fast math*/
        }
    }
}

Approaches I've taken that have not worked are:

  • 1+ threads populating an ArrayBlockingQueue, and my main loop consuming and populating the array (the boxing/unboxing was killer here)
  • Generating the vectors with a Callable (yielding a future) while doing the non-dependent parts of the math (it appears the overhead of the indirection outweighed whatever parallelism gains I got)
  • Using 2 ArrayBlockingQueue, each populated by a thread, one for the short and one for the long arrays (still roughly twice as slow as the direct single-threaded case).

I'm not looking for "solutions" to my particular problem so much as how to handle the general case of generating large streams of small, independent primitives in parallel and consuming them from a single thread.

like image 646
Bryce Avatar asked Aug 03 '12 18:08

Bryce


2 Answers

The problem with your performance seems to be that the individual jobs are too small so most of the time is spent doing the synchronization and queueing of the jobs themselves. One thing to consider is not to generate a large stream of small jobs but to deliver to each working thread a medium sized collection of jobs that it will annotate with the answer.

For example, instead of iterating through your loop with the first thread doing iteration #0, the next thread doing iteration #1, ... I would have the first thread do iterations #0 through #999 or some such. They should be working independently and annotate a Job class with the answer of their calculations. Then at the end they can return the entire collection of the jobs that have been finished as a Future.

Your Job class might be something like the following:

public class Job {
    Collection<RealVector> dataCollection;
    Collection<SomeAnswer> answerCollection = new ArrayList<SomeAnswer>();
    public void run() {
        for (RealVector d : dataCollection) {
           // do the magic work on the vector
           while(!converged){
              ...
           }
           // put the associated "answer" in another collection
           answerCollection.add(someAnswer);
        }
    }
}
like image 189
Gray Avatar answered Sep 30 '22 19:09

Gray


This is more efficient than using a Queue because;

  • the payload is an array of double[] meaning the background thread can generate more data before having to pass it off.
  • all the objects are recycled.

.

public class RandomGenerator {
    private final ExecutorService generator = Executors.newSingleThreadExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, "generator");
            t.setDaemon(true);
            return t;
        }
    });
    private final Exchanger<double[][]> exchanger = new Exchanger<>();
    private double[][] buffer;
    private int nextRow = Integer.MAX_VALUE;

    public RandomGenerator(final int rows, final int columns) {
        buffer = new double[rows][columns];
        generator.submit(new Callable<Void>() {
            @Override
            public Void call() throws Exception {
                Random random = new Random();
                double[][] buffer2 = new double[rows][columns];
                while (!Thread.interrupted()) {
                    for (int r = 0; r < rows; r++)
                        for (int c = 0; c < columns; c++)
                            buffer2[r][c] = random.nextGaussian();
                    buffer2 = exchanger.exchange(buffer2);
                }
                return null;
            }
        });
    }

    public double[] nextArray() throws InterruptedException {
        if (nextRow >= buffer.length) {
            buffer = exchanger.exchange(buffer);
            nextRow = 0;
        }
        return buffer[nextRow++];
    }
}

Random is thread safe and synchronized. This means each thread needs it own Random to perform concurrently.

how to handle the general case of generating large streams of small, independent primitives in parallel and consuming them from a single thread.

I would use an Exchanger<double[][]> to populate values in the background as pass them efficiently (without much GC overhead)

like image 30
Peter Lawrey Avatar answered Sep 30 '22 19:09

Peter Lawrey