Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Executorservice and Runnable

Tags:

java

I have a list containing data to compute for each pixel (e.g. list size = 1024x768). Now I want to iterate multithreaded through the list and save the computation for each pixel in a HashMap. But whatever I do, I can't manage to make it right. I tried several ways, my last was this one:

        ConcurrentMap<T, Color> map = new ConcurrentHashMap<T, Color>();

        ExecutorService pool = Executors.newFixedThreadPool(4);

        Iterator<T> it = camera.iterator();
        while (it.hasNext()) {
            Runnable run = () -> {
                int i = 0;
                while (it.hasNext() && i < 1000) {
                    i++;
                    T cameraRay = it.next();
                    if (object.collide(cameraRay.getRay()) == null)
                        map.put(cameraRay, BG_COLOR);
                    else
                        map.put(cameraRay, this.shader.shade(cameraRay.getRay(), object.collide(cameraRay.getRay())).getColor());
                }
            };
            pool.execute(run);
        }
        pool.shutdown();
        try {
            if (pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
                System.out.println("Mapsize: " + map.size());
                // Draw Image:
                map.forEach((ray, color) -> {image.setColor(ray, color);});
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

Note thet the iterators hasNext() Method is synchronized. The Problem is sometimes a heap problem or simply that the size of the HashMap is less than the list size.

I guess that I didn't understand something correct concering Runnables or the ExecutorService.

I appreciate any help here.

EDIT: I added a System.out.println(i) just before the i++ statement. Despite of checking for i < 1000 at some point suddenly there appears the following:

507
169
86624
625
626
Exception in thread "pool-2-thread-2" java.lang.OutOfMemoryError: Java heap space
Exception in thread "pool-2-thread-3" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at com.sun.javafx.application.LauncherImpl.launchApplicationWithArgs(Unknown Source)
    at com.sun.javafx.application.LauncherImpl.launchApplication(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at sun.launcher.LauncherHelper$FXHelper.main(Unknown Source)
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.concurrent.ThreadPoolExecutor.addWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
    at raytracer.impl.ParallelRenderer.render(ParallelRenderer.java:78)
    at raytracer.ImageViewer.main(ImageViewer.java:118)
    ... 11 more
Exception in thread "pool-2-thread-4" java.lang.OutOfMemoryError: Java heap space
java.lang.OutOfMemoryError: Java heap space
    at raytracer.impl.TriangleImpl.collide(TriangleImpl.java:87)
    at raytracer.impl.SimpleScene.collide(SimpleScene.java:27)
    at raytracer.impl.ParallelRenderer.lambda$0(ParallelRenderer.java:71)
    at raytracer.impl.ParallelRenderer$$Lambda$48/24559708.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

EDIT 2: According to the answer of Warkst, I tried the following

Iterator<T> it = camera.iterator();
List<T> buffer = new ArrayList<T>(1000);
while (it.hasNext()) {
    buffer.add(it.next());
    if (buffer.size() >= 1000 || !it.hasNext()) {
        Runnable run = () -> {
            for (T cameraRay : buffer) {
                if (object.collide(cameraRay.getRay()) == null) // No collision
                    map.put(cameraRay, BG_COLOR);
                else
                    map.put(cameraRay, this.shader.shade(cameraRay.getRay(), object.collide(cameraRay.getRay())).getColor());
            }
        };
        pool.execute(run);
        buffer.clear();
    }
}

But the very strange is, that the Runnable block is never entered now, why?

like image 670
Rafael Wörner Avatar asked Oct 20 '22 15:10

Rafael Wörner


1 Answers

What confuses me is that your runnables all use the same iterator. What surprises me even more is that you spawn runnables while iterating over the iterator, but that those runnables ALSO manipulate the iterator. This code can (and will, as proven by your question) lead to a bunch of race conditions and the consequent headaches.

I would suggest the following:

  1. Get the camera iterator
  2. Make an empty buffer
  3. Read the first x (e.g. 1000) samples from the iterator into the buffer
  4. Create a runnable with the buffer, which will do some work with its 1000 entries
  5. Submit the runnable to the service and go back to 2. Repeat until iterator no longer has next.

Assuming your processing of the data is (significantly) slower than iterating once over the camera, this should do the trick. If that's not the case, there's really no reason to be multithreading.

update 2

I've updated my code sample to something that works:

public static void main(String[] args) throws InterruptedException {
    ConcurrentHashMap<Integer, String> map = new ConcurrentHashMap<>();
    ExecutorService pool = Executors.newFixedThreadPool(4);
    Iterator<Integer> it = getIt();
    Task t = new Task(map);
    while (it.hasNext()) {
        t.add(it.next());
        if (t.size()>=1000 || !it.hasNext()) {
            pool.submit(t);
            t = new Task(map);
        }
    }
    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.DAYS);

    // Breakpoint here to inspect map
    System.out.println("Done!");
}

With

private static Iterator<Integer> getIt(){
    return new Iterator<Integer>() {

        private int nr = 0;

        @Override
        public boolean hasNext() {
            return nr < 20000;
        }

        @Override
        public Integer next() {
            return nr++;
        }
    };
}

And

private static class Task extends ArrayList<Integer> implements Runnable{
    private final ConcurrentHashMap<Integer, String> map;

    public Task(ConcurrentHashMap<Integer, String> map) {
        this.map = map;
    }

    @Override
    public void run() {
        try{
            for (Integer i : this) {
                // Simulate some crunching: write to the stdout in 10 iterations for each number: 10 000 prints for each Runnable
                for (int j = 0; j < 10; j++) {
                    System.out.println("Iteration "+j+" for "+i);
                }
                // Store something in the map, namely that this Integer, or T in your case, has been processed
                map.put(i, "Done");
            }
        } catch(Exception e){
            e.printStackTrace();
        }
    }
}

The breakpoint is hit after about 20-30 seconds and the map contains all Integers paired with the String "Done".

Debug results

like image 141
RDM Avatar answered Oct 22 '22 08:10

RDM