I have a list containing data to compute for each pixel (e.g. list size = 1024x768). Now I want to iterate multithreaded through the list and save the computation for each pixel in a HashMap. But whatever I do, I can't manage to make it right. I tried several ways, my last was this one:
ConcurrentMap<T, Color> map = new ConcurrentHashMap<T, Color>();
ExecutorService pool = Executors.newFixedThreadPool(4);
Iterator<T> it = camera.iterator();
while (it.hasNext()) {
Runnable run = () -> {
int i = 0;
while (it.hasNext() && i < 1000) {
i++;
T cameraRay = it.next();
if (object.collide(cameraRay.getRay()) == null)
map.put(cameraRay, BG_COLOR);
else
map.put(cameraRay, this.shader.shade(cameraRay.getRay(), object.collide(cameraRay.getRay())).getColor());
}
};
pool.execute(run);
}
pool.shutdown();
try {
if (pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
System.out.println("Mapsize: " + map.size());
// Draw Image:
map.forEach((ray, color) -> {image.setColor(ray, color);});
}
} catch (InterruptedException e) {
e.printStackTrace();
}
Note thet the iterators hasNext()
Method is synchronized.
The Problem is sometimes a heap problem or simply that the size of the HashMap is less than the list size.
I guess that I didn't understand something correct concering Runnables or the ExecutorService.
I appreciate any help here.
EDIT:
I added a System.out.println(i)
just before the i++
statement. Despite of checking for i < 1000
at some point suddenly there appears the following:
507
169
86624
625
626
Exception in thread "pool-2-thread-2" java.lang.OutOfMemoryError: Java heap space
Exception in thread "pool-2-thread-3" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at com.sun.javafx.application.LauncherImpl.launchApplicationWithArgs(Unknown Source)
at com.sun.javafx.application.LauncherImpl.launchApplication(Unknown Source)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at sun.launcher.LauncherHelper$FXHelper.main(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.concurrent.ThreadPoolExecutor.addWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at raytracer.impl.ParallelRenderer.render(ParallelRenderer.java:78)
at raytracer.ImageViewer.main(ImageViewer.java:118)
... 11 more
Exception in thread "pool-2-thread-4" java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
at raytracer.impl.TriangleImpl.collide(TriangleImpl.java:87)
at raytracer.impl.SimpleScene.collide(SimpleScene.java:27)
at raytracer.impl.ParallelRenderer.lambda$0(ParallelRenderer.java:71)
at raytracer.impl.ParallelRenderer$$Lambda$48/24559708.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
EDIT 2: According to the answer of Warkst, I tried the following
Iterator<T> it = camera.iterator();
List<T> buffer = new ArrayList<T>(1000);
while (it.hasNext()) {
buffer.add(it.next());
if (buffer.size() >= 1000 || !it.hasNext()) {
Runnable run = () -> {
for (T cameraRay : buffer) {
if (object.collide(cameraRay.getRay()) == null) // No collision
map.put(cameraRay, BG_COLOR);
else
map.put(cameraRay, this.shader.shade(cameraRay.getRay(), object.collide(cameraRay.getRay())).getColor());
}
};
pool.execute(run);
buffer.clear();
}
}
But the very strange is, that the Runnable
block is never entered now, why?
What confuses me is that your runnables all use the same iterator. What surprises me even more is that you spawn runnables while iterating over the iterator, but that those runnables ALSO manipulate the iterator. This code can (and will, as proven by your question) lead to a bunch of race conditions and the consequent headaches.
I would suggest the following:
Assuming your processing of the data is (significantly) slower than iterating once over the camera, this should do the trick. If that's not the case, there's really no reason to be multithreading.
update 2
I've updated my code sample to something that works:
public static void main(String[] args) throws InterruptedException {
ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
ExecutorService pool = Executors.newFixedThreadPool(4);
Iterator<Integer> it = getIt();
Task t = new Task(map);
while (it.hasNext()) {
t.add(it.next());
if (t.size()>=1000 || !it.hasNext()) {
pool.submit(t);
t = new Task(map);
}
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS);
// Breakpoint here to inspect map
System.out.println("Done!");
}
With
private static Iterator<Integer> getIt(){
return new Iterator<Integer>() {
private int nr = 0;
@Override
public boolean hasNext() {
return nr < 20000;
}
@Override
public Integer next() {
return nr++;
}
};
}
And
private static class Task extends ArrayList<Integer> implements Runnable{
private final ConcurrentHashMap<Integer, String> map;
public Task(ConcurrentHashMap<Integer, String> map) {
this.map = map;
}
@Override
public void run() {
try{
for (Integer i : this) {
// Simulate some crunching: write to the stdout in 10 iterations for each number: 10 000 prints for each Runnable
for (int j = 0; j < 10; j++) {
System.out.println("Iteration "+j+" for "+i);
}
// Store something in the map, namely that this Integer, or T in your case, has been processed
map.put(i, "Done");
}
} catch(Exception e){
e.printStackTrace();
}
}
}
The breakpoint is hit after about 20-30 seconds and the map contains all Integers paired with the String "Done".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With