Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ForkJoinPool - Why program is throwing OutOfMemoryError?

I wanted to try out ForkJoinPool in Java 8 so i wrote a small program for searching all the files whose name contains a specific keyword in a given directory.

Program:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask("./DIR");
        ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool();
        List<String> files = pool.invoke(task);
        pool.shutdown();
        System.out.println("Total  no of files with hello" + files.size());
    }

}

    class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
        private String path;
        public FileSearchRecursiveTask(String path) {
            this.path = path;
        }

        @Override
        protected List<String> compute() {
            File mainDirectory = new File(path);
            List<String> filetedFileList = new ArrayList<>();
            List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
            if(mainDirectory.isDirectory()) {
                System.out.println(Thread.currentThread() + " - Directory is " + mainDirectory.getName());
                if(mainDirectory.canRead()) {
                    File[] fileList = mainDirectory.listFiles();
                    for(File file : fileList) {
                        System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
                        if(file.isDirectory()) {
                            FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
                            recursiveTasks.add(task);
                            task.fork();
                        } else {
                            if (file.getName().contains("hello")) {
                                System.out.println(file.getName());
                                filetedFileList.add(file.getName());
                            }
                        }
                    }
                }

                for(FileSearchRecursiveTask task : recursiveTasks) {
                  filetedFileList.addAll(task.join());
                }

        }
        return filetedFileList;

    }
}

This program works fine when directory doesn't have too many sub-directories and files but if its really big then it throws OutOfMemoryError.

My understanding is that max number of threads (including compensation threads) are bounded so why their is this error? Am i missing anything in my program?

Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1486)
at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2020)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2057)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.tryRemoveAndExec(ForkJoinPool.java:1107)
at java.util.concurrent.ForkJoinPool.awaitJoin(ForkJoinPool.java:2046)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:390)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at FileSearchRecursiveTask.compute(DirectoryService.java:51)
at FileSearchRecursiveTask.compute(DirectoryService.java:20)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:94)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)   
like image 596
Atul Avatar asked Aug 02 '18 10:08

Atul


People also ask

How does ForkJoinPool work in Java?

ForkJoinPool acts recursively, unlike Executor threads, which splits the task and submits smaller chunks to worker Threads. ForkJoinPool takes a big task, splits it into smaller tasks, and those smaller tasks split themselves again into subtasks until each subtask is atomic or not divisible. So it works recursively.

What is parallelism ForkJoinPool?

A ForkJoinPool is constructed with a given target parallelism level; by default, equal to the number of available processors. The pool attempts to maintain enough active (or available) threads by dynamically adding, suspending, or resuming internal worker threads, even if some tasks are stalled waiting to join others.

What is difference between ExecutorService and ForkJoinPool?

The Fork/Join framework in Java 7 is an implementation of the Divide and Conquer algorithm, in which a central ForkJoinPool executes branching ForkJoinTasks. ExecutorService is an Executor that provides methods to manage the progress-tracking and termination of asynchronous tasks.

What is ForkJoinPool commonPool?

ForkJoinPool#commonPool() is a static thread-pool, which is lazily initialized when is actually needed. Two major concepts use the commonPool inside JDK: CompletableFuture and Parallel Streams .


Video Answer


2 Answers

You should not fork new tasks beyond all recognition. Basically, you should fork as long as there’s a chance that another worker thread can pick up the forked job and evaluate locally otherwise. Then, once you have forked a task, don’t call join() right afterwards. While the underlying framework will start compensation threads to ensure that your jobs will proceed instead of just having all threads blocked waiting for a sub-task, this will create that large amount of threads that may exceed the system’s capabilities.

Here is a revised version of your code:

public class DirectoryService {

    public static void main(String[] args) {
        FileSearchRecursiveTask task = new FileSearchRecursiveTask(new File("./DIR"));
        List<String> files = task.invoke();
        System.out.println("Total no of files with hello " + files.size());
    }

}

class FileSearchRecursiveTask extends RecursiveTask<List<String>> {
    private static final int TARGET_SURPLUS = 3;
    private File path;
    public FileSearchRecursiveTask(File file) {
        this.path = file;
    }

    @Override
    protected List<String> compute() {
        File directory = path;
        if(directory.isDirectory() && directory.canRead()) {
            System.out.println(Thread.currentThread() + " - Directory is " + directory.getName());
            return scan(directory);
        }
        return Collections.emptyList();
    }

    private List<String> scan(File directory)
    {
        File[] fileList = directory.listFiles();
        if(fileList == null || fileList.length == 0) return Collections.emptyList();
        List<FileSearchRecursiveTask> recursiveTasks = new ArrayList<>();
        List<String> filteredFileList = new ArrayList<>();
        for(File file: fileList) {
            System.out.println(Thread.currentThread() + "Looking into:" + file.getAbsolutePath());
            if(file.isDirectory())
            {
                if(getSurplusQueuedTaskCount() < TARGET_SURPLUS)
                {
                    FileSearchRecursiveTask task = new FileSearchRecursiveTask(file);
                    recursiveTasks.add(task);
                    task.fork();
                }
                else filteredFileList.addAll(scan(file));
            }
            else if(file.getName().contains("hello")) {
                filteredFileList.add(file.getAbsolutePath());
            }
        }

        for(int ix = recursiveTasks.size() - 1; ix >= 0; ix--) {
            FileSearchRecursiveTask task = recursiveTasks.get(ix);
            if(task.tryUnfork()) task.complete(scan(task.path));
        }

        for(FileSearchRecursiveTask task: recursiveTasks) {
            filteredFileList.addAll(task.join());
        }
        return filteredFileList;
    }
}

The method doing the processing has been factored out into a method receiving the directory as parameter, so we are able to use it locally for arbitrary directories not necessarily being associated with a FileSearchRecursiveTask instance.

Then, the method uses getSurplusQueuedTaskCount() to determine the number of locally enqueued tasks which have not been picked up by other worker threads. Ensuring that there are some helps work balancing. But if this number exceeds the threshold, the processing will be done locally without forking more jobs.

After the local processing, it iterates over the tasks and uses tryUnfork() to identify jobs which have not been stolen by other worker threads and process them locally. Iterating backwards to start this with the youngest jobs raises the chances to find some.

Only afterwards, it join()s with all sub-jobs which are now either, completed or currently processed by another worker thread.

Note that I changed the initiating code to use the default pool. This uses “number of CPU cores” minus one worker threads, plus the initiating thread, i.e. the main thread in this example.

like image 80
Holger Avatar answered Nov 14 '22 22:11

Holger


Just a minor change is required. You need to specify the parallelism for newWorkStealingPool as follows:

ForkJoinPool pool = (ForkJoinPool) Executors.newWorkStealingPool(5);

As per its documentation:

newWorkStealingPool(int parallelism) -> Creates a thread pool that maintains enough threads to support the given parallelism level, and may use multiple queues to reduce contention. The parallelism level corresponds to the maximum number of threads actively engaged in, or available to engage in, task processing. The actual number of threads may grow and shrink dynamically. A work-stealing pool makes no guarantees about the order in which submitted tasks are executed.

As per the attached Java Visual VM screenshot, this parallelism allows the program to work within the memory specified and never goes out of memory. enter image description here

And, one more thing (not sure if it will make any effect):

Change the order in which fork is called and the task is added to list. That is, change

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
recursiveTasks.add(task);
task.fork();

to

FileSearchRecursiveTask task = new FileSearchRecursiveTask(file.getAbsolutePath());
task.fork();
recursiveTasks.add(task);
like image 37
KayV Avatar answered Nov 15 '22 00:11

KayV