I submit some some Future tasks using a CompletionService wrapped round a 2 thread FixedThreadPool ExecutorService, I set then setup a loop equal to the number of tasks submitted and use completionservice.take() waiting for them all to complete or fail. Trouble is very occasionally it never finishes (but I don't know why) so I changed the take() method to a poll(300,Timeout.SECONDS), the idea being if one task takes a longer than 5 minutes to complete that poll will fail and then eventually will get out of the loop and I can go through all the futures and call future.cancel(true) to force cancellation of the offending task.
But when I run the code and it hangs, I see the poll fails continually once every 5 minutes and no more tasks run so I assume that the two workers are deadlocked in some way and never finish, and never allow additional tasks to start. Because the timeout is 5 minutes and there were still 1000 tasks to run the time taken to break out the loop was too long so cancelled the job.
So what I want to do is interupt/force cancellation the current task if hasnt completed in 5 minutes but I cant see any way to do it.
This code sample shows a simplified version of what Im talking about
import com.jthink.jaikoz.exception.JaikozException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(2));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println("Worker TimedOut:");
continue;
}
else
{
try
{
if(result.isDone() && result.get())
{
System.out.println("Worker Completed:");
}
else
{
System.out.println("Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace();
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println("Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
{
if(number==3)
{
try
{
Thread.sleep(50000);
}
catch(InterruptedException tie)
{
}
}
return true;
}
}
Output
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker Completed:
Worker TimedOut:
Done
I think Ive solved it, basically if a timeout occurs I then iterate through my list of future objects and find the first one that has not completed, and force cancellation. Doesn't seem that elegant but seems to work.
Ive changed size of pool just to show output that better demonstrates the solution but works with 2 threaded pool as well.
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceTest
{
public static void main(final String[] args)
{
CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(Executors.newFixedThreadPool(1));
Collection<Worker> tasks = new ArrayList<Worker>(10);
tasks.add(new Worker(1));
tasks.add(new Worker(2));
tasks.add(new Worker(3));
tasks.add(new Worker(4));
tasks.add(new Worker(5));
tasks.add(new Worker(6));
List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(tasks.size());
try
{
for (Callable task : tasks)
{
futures.add(cs.submit(task));
}
for (int t = 0; t < futures.size(); t++)
{
System.out.println("Invocation:"+t);
Future<Boolean> result = cs.poll(10, TimeUnit.SECONDS);
if(result==null)
{
System.out.println(new Date()+":Worker Timedout:");
//So lets cancel the first futures we find that havent completed
for(Future future:futures)
{
System.out.println("Checking future");
if(future.isDone())
{
continue;
}
else
{
future.cancel(true);
System.out.println("Cancelled");
break;
}
}
continue;
}
else
{
try
{
if(result.isDone() && !result.isCancelled() && result.get())
{
System.out.println(new Date()+":Worker Completed:");
}
else if(result.isDone() && !result.isCancelled() && !result.get())
{
System.out.println(new Date()+":Worker Failed");
}
}
catch (ExecutionException ee)
{
ee.printStackTrace(System.out);
}
}
}
}
catch (InterruptedException ie)
{
}
finally
{
//Cancel by interrupting any existing tasks currently running in Executor Service
for (Future<Boolean> f : futures)
{
f.cancel(true);
}
}
System.out.println(new Date()+":Done");
}
}
class Worker implements Callable<Boolean>
{
private int number;
public Worker(int number)
{
this.number=number;
}
public Boolean call()
throws InterruptedException
{
try
{
if(number==3)
{
Thread.sleep(50000);
}
}
catch(InterruptedException ie)
{
System.out.println("Worker Interuppted");
throw ie;
}
return true;
}
}
Output is
Invocation:0
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:1
Thu Mar 10 20:51:39 GMT 2011:Worker Completed:
Invocation:2
Thu Mar 10 20:51:49 GMT 2011:Worker Timedout:
Checking future
Checking future
Checking future
Cancelled
Invocation:3
Worker Interuppted
Invocation:4
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Invocation:5
Thu Mar 10 20:51:49 GMT 2011:Worker Completed:
Thu Mar 10 20:51:49 GMT 2011:Done
In your worker example, your Callable is blocking on a call that supports interruption. If your real code is deadlocking on an intrinsic lock (a synchronized
block), you won't be able to cancel it via interruption. Instead, you can use an explicit lock (java.util.concurrent.Lock
), which allows you to specify how long you want to wait for lock acquisition. If a thread times out waiting for a lock, possibly because it has encountered a deadlock situation, it can abort with an error message.
By the way, in your example, your Callable
shouldn't swallow InterruptedException
. You should either pass it up (rethrow, or add InterruptedException
to the throws line of your method declaration), or in the catch block, reset the interrupted state of the thread (with Thread.currentThread().interrupt()
).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With