I have code that consumes a large number (millions currently, eventually billions) of relatively short (5-100 elements) arrays of random numbers and does some not-very-strenuous math with them. Random numbers being, well, random, ideally I'd like to generate them on multiple cores, since random number generation is > 50% of my runtime in profiling. However, I'm having difficulty distributing a large number of small tasks in a way that's not slower than the single-threaded approach.
My code currently looks something like this:
for(int i=0;i<1000000;i++){
for(RealVector d:data){
while(!converged){
double[] shortVec = new double[5];
for(int i=0;i<5;i++) shortVec[i]=rng.nextGaussian();
double[] longerVec = new double[50];
for(int i=0;i<50;i++) longerVec[i]=rng.nextGaussian();
/*Do some relatively fast math*/
}
}
}
Approaches I've taken that have not worked are:
I'm not looking for "solutions" to my particular problem so much as how to handle the general case of generating large streams of small, independent primitives in parallel and consuming them from a single thread.
The problem with your performance seems to be that the individual jobs are too small so most of the time is spent doing the synchronization and queueing of the jobs themselves. One thing to consider is not to generate a large stream of small jobs but to deliver to each working thread a medium sized collection of jobs that it will annotate with the answer.
For example, instead of iterating through your loop with the first thread doing iteration #0, the next thread doing iteration #1, ... I would have the first thread do iterations #0 through #999 or some such. They should be working independently and annotate a Job
class with the answer of their calculations. Then at the end they can return the entire collection of the jobs that have been finished as a Future
.
Your Job
class might be something like the following:
public class Job {
Collection<RealVector> dataCollection;
Collection<SomeAnswer> answerCollection = new ArrayList<SomeAnswer>();
public void run() {
for (RealVector d : dataCollection) {
// do the magic work on the vector
while(!converged){
...
}
// put the associated "answer" in another collection
answerCollection.add(someAnswer);
}
}
}
This is more efficient than using a Queue because;
double[]
meaning the background thread can generate more data before having to pass it off..
public class RandomGenerator {
private final ExecutorService generator = Executors.newSingleThreadExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "generator");
t.setDaemon(true);
return t;
}
});
private final Exchanger<double[][]> exchanger = new Exchanger<>();
private double[][] buffer;
private int nextRow = Integer.MAX_VALUE;
public RandomGenerator(final int rows, final int columns) {
buffer = new double[rows][columns];
generator.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
Random random = new Random();
double[][] buffer2 = new double[rows][columns];
while (!Thread.interrupted()) {
for (int r = 0; r < rows; r++)
for (int c = 0; c < columns; c++)
buffer2[r][c] = random.nextGaussian();
buffer2 = exchanger.exchange(buffer2);
}
return null;
}
});
}
public double[] nextArray() throws InterruptedException {
if (nextRow >= buffer.length) {
buffer = exchanger.exchange(buffer);
nextRow = 0;
}
return buffer[nextRow++];
}
}
Random is thread safe and synchronized. This means each thread needs it own Random to perform concurrently.
how to handle the general case of generating large streams of small, independent primitives in parallel and consuming them from a single thread.
I would use an Exchanger<double[][]>
to populate values in the background as pass them efficiently (without much GC overhead)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With