Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Process M slow calculations on N threads in Java

I need to run N slow calculations (where N is a fairly large number) and would like to do so on M threads since the slow calculations have a good deal of IO wait time. I have put together a small example that works well for the case that all calculations succeed. However, if a calculation fails the desired behavior is to stop processing further calculations. Each successful calculation has already written it's result to a database, so I just need to determine which calculation failed and stop calculations that have not yet been started.

My approach is to use the ExecutorService interface to an Executors.newFixedThreadPool. However, I don't see a clean way to identify that one of the calculations failed (in my example returned false) and stop calculations that have been submitted to the ExecutorService but not yet assigned a thread from the pool.

Is there a clean method to do that? Is there a better approach for me to consider?

import java.util.*;
import java.util.concurrent.*;

class Future
{
    static private class MyWorker implements Callable
    {   
        private Integer item;
        public MyWorker(Integer item)
        {
            this.item = item;
        }

        public Boolean call() throws InterruptedException
        {
            if (item == 42) 
            {
                return false;
            }
            else
            {
                System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
                Thread.sleep(1000);
                return true;
            }
        }   
    }

    static int NTHREADS = 2;

    public static void main(String args[]) 
    {
        Queue<Integer> numbers = new LinkedList<Integer>();     
        for (int i=1; i<10000; i++)
        {
            numbers.add(i);
        }

        System.out.println("Starting thread test.");

        ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

        for (Integer i : numbers)
        {
            MyWorker my = new MyWorker(i);
            System.out.println("Submit..." + i.toString());
            exec.submit(my);
            System.out.println("... Done Submit");
        }

        exec.shutdown();

        System.out.println("Exiting thread test.");

    }
}

EDIT: Here's a working implementation of afk's suggestion. Still plan to look at the callback solution and hope for other suggestions.

import java.util.*;
import java.util.concurrent.*;

class MyFuture
{
    static private class MyWorker implements Callable
    {   
        private Integer item;
        public MyWorker(Integer item)
        {
            this.item = item;
        }

        public Boolean call() 
        {
            if (item == 42) 
            {
                return false;
            }
            else
            {
                System.out.println("Processing: " + item.toString() + " on " + Thread.currentThread().getName());
                try
                {
                    Thread.sleep(1000);
                }
                catch (InterruptedException ie) 
                { 
                // Not much to do here except be grumpy they woke us up...
                } 
                return true;
            }
        }   
    }

    static int NTHREADS = 4;

    public static void main(String args[]) throws InterruptedException
    {
        Queue<Integer> numbers = new LinkedList<Integer>();     
        for (int i=1; i<100; i++)
        {
            numbers.add(i);
        }

        System.out.println("Starting thread test.");

        ExecutorService exec = Executors.newFixedThreadPool(NTHREADS);

        List<Future<Boolean>> futures = new LinkedList<Future<Boolean>>();

        for (Integer i : numbers)
        {
            MyWorker my = new MyWorker(i);
            System.out.println("Submit..." + i.toString());
            Future<Boolean> f = exec.submit(my);
            futures.add(f);
            System.out.println("... Done Submit");
        }

        boolean done = false;

        while (!done)
        {
            Iterator<Future<Boolean>> it = futures.iterator();

            while (it.hasNext()) 
            {
                Future<Boolean> f = it.next();
                if (f.isDone())
                {
                    try
                    {
                        System.out.println("CHECK RETURN VALUE");
                        if (f.get()) 
                        {
                            it.remove();
                        }
                        else
                        {                   
                            System.out.println("IMMEDIATE SHUTDOWN");
                            exec.shutdownNow();
                            done = true;
                            break;
                        }
                    }
                    catch (InterruptedException ie)
                    {
                    }
                    catch (ExecutionException ee)
                    {
                    }
                }
            }
            Thread.sleep(1000);
            if (futures.size() == 0)
            {
                done = true;
            }
        }

        exec.shutdown();

        System.out.println("Exiting thread test.");

    }
}
like image 722
Eric J. Avatar asked May 15 '26 13:05

Eric J.


1 Answers

Using a callback, as I outline in another answer, you can be notified of a failure, and cancel all submitted jobs. (In my example, the Callback implementation class could have a reference to a Collection to which each Future was added.) For those tasks that have completed (or started, depending on the value of the argument) cancel does nothing. The rest will never be started.

like image 72
erickson Avatar answered May 18 '26 01:05

erickson