Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I cancel that tasks that are taking too long using CompletionService

I submit some some Future tasks using a CompletionService wrapped round a 2 thread FixedThreadPool ExecutorService, I set then setup a loop equal to the number of tasks submitted and use completionservice.take() waiting for them all to complete or fail. Trouble is very occasionally it never finishes (but I don't know why) so I changed the take() method to a poll(300,Timeout.SECONDS), the idea being if one task takes a longer than 5 minutes to complete that poll will fail and then eventually will get out of the loop and I can go through all the futures and call future.cancel(true) to force cancellation of the offending task.

But when I run the code and it hangs, I see the poll fails continually once every 5 minutes and no more tasks run so I assume that the two workers are deadlocked in some way and never finish, and never allow additional tasks to start. Because the timeout is 5 minutes and there were still 1000 tasks to run the time taken to break out the loop was too long so cancelled the job.

So what I want to do is interupt/force cancellation the current task if hasnt completed in 5 minutes but I cant see any way to do it.

This code sample shows a simplified version of what Im talking about

import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println("Worker TimedOut:");
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && result.get())
                        {
                            System.out.println("Worker Completed:");
                        }
                        else
                        {
                            System.out.println("Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace();
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println("Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
    {
        if(number==3)
        {
            try
            {
                Thread.sleep(50000);
            }
            catch(InterruptedException tie)
            {

            }
        }
        return true;
    }
}

Output

Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
like image 381
Paul Taylor Avatar asked Mar 10 '11 14:03

Paul Taylor


2 Answers

I think Ive solved it, basically if a timeout occurs I then iterate through my list of future objects and find the first one that has not completed, and force cancellation. Doesn't seem that elegant but seems to work.

Ive changed size of pool just to show output that better demonstrates the solution but works with 2 threaded pool as well.

import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceTest
{
    public static void main(final String[] args)
    {
        CompletionService<Boolean>  cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
        Collection<Worker> tasks = new ArrayList<Worker>(10);
        tasks.add(new Worker(1));
        tasks.add(new Worker(2));
        tasks.add(new Worker(3));
        tasks.add(new Worker(4));
        tasks.add(new Worker(5));
        tasks.add(new Worker(6));

        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
        try
        {
            for (Callable task : tasks)
            {
                futures.add(cs.submit(task));
            }
            for (int t = 0; t < futures.size(); t++)
            {
                System.out.println("Invocation:"+t);
                Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
                if(result==null)
                {
                    System.out.println(new Date()+":Worker Timedout:");
                    //So lets cancel the first futures we find that havent completed
                    for(Future future:futures)
                    {
                        System.out.println("Checking future");
                        if(future.isDone())
                        {
                            continue;
                        }
                        else
                        {
                            future.cancel(true);
                            System.out.println("Cancelled");
                            break;
                        }
                    }
                    continue;
                }
                else
                {
                    try
                    {
                        if(result.isDone() && !result.isCancelled() && result.get())
                        {
                            System.out.println(new Date()+":Worker Completed:");
                        }
                        else if(result.isDone() && !result.isCancelled() && !result.get())
                        {
                            System.out.println(new Date()+":Worker Failed");
                        }
                    }
                    catch (ExecutionException ee)
                    {
                        ee.printStackTrace(System.out);
                    }
                }
            }
       }
        catch (InterruptedException ie)
        {
        }
        finally
        {
            //Cancel by interrupting any existing tasks currently running in Executor Service
            for (Future<Boolean> f : futures)
            {
                f.cancel(true);
            }
        }
        System.out.println(new Date()+":Done");
    }
}

class Worker implements Callable<Boolean>
{
    private int number;
    public Worker(int number)
    {
        this.number=number;
    }

    public Boolean call()
        throws InterruptedException
    {
        try
        {
            if(number==3)
            {
                Thread.sleep(50000);
            }
        }
        catch(InterruptedException ie)
        {
            System.out.println("Worker Interuppted");
            throw ie;
        }
        return true;
    }
}

Output is

Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
like image 95
Paul Taylor Avatar answered Nov 15 '22 11:11

Paul Taylor


In your worker example, your Callable is blocking on a call that supports interruption. If your real code is deadlocking on an intrinsic lock (a synchronized block), you won't be able to cancel it via interruption. Instead, you can use an explicit lock (java.util.concurrent.Lock), which allows you to specify how long you want to wait for lock acquisition. If a thread times out waiting for a lock, possibly because it has encountered a deadlock situation, it can abort with an error message.

By the way, in your example, your Callable shouldn't swallow InterruptedException. You should either pass it up (rethrow, or add InterruptedException to the throws line of your method declaration), or in the catch block, reset the interrupted state of the thread (with Thread.currentThread().interrupt()).

like image 23
sk. Avatar answered Nov 15 '22 13:11

sk.