Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java - Define a timeout for Callable within a ExecutorCompletionService

I've got the following problem using ExecutorCompletionService. I want to call a lot of Callable in different Threads. These Callable don't share any information with each other. I need to define a timeout for each Callable, eg. do not run longer than 5 seconds. Each Callable can run in a different time that I do not know when starting. After the timeout the Thread should be stopped/killed and the result is not interesting any more for me. The other 'normal' running Threads should not be infuenced.

So lets take the following example with a simple callable and my current java code.

import java.util.Date;
import java.util.concurrent.Callable;

public class Job implements Callable<Integer> {

    int returnValue = 0;
    long millis = 0;

    public Job(long millis, int value) {
        this.millis = millis;
        this.returnValue = value;
    }

    @Override
    public Integer call() throws Exception, InterruptedException {
        try {
            System.out.println(new Date() + " " + returnValue + " started");
            Thread.sleep(millis);
            System.out.println(new Date() + " " + returnValue + " finished");
            return returnValue;
        } catch (InterruptedException e) {
            System.out.println(new Date() + " " + returnValue + " interrupted");
            throw e;
        }        
    }
}

And the other Class where the Callable is used.

import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;

public class CallableTest {

    public static void main(String[] args) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(newFixedThreadPool);
        
        for (int i = 10; i > 0; i--) {
            Job job = new Job(i * 1000, i);
            pool.submit(job);
        }
        
        ArrayList<Integer> results = new ArrayList<Integer>();
        for (int i = 1; i < 11; ++i) {
            try {
                Future<Integer> future = pool.take();
                Integer content = future.get(5, TimeUnit.SECONDS);
                results.add(content);
                System.out.println(new Date() + " added " + content);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        newFixedThreadPool.shutdownNow();

        System.out.println(new Date() + " results:");
        for (int j : results) {
            System.out.println(new Date() + " " + j);
        }
    }
}

The ouput is something like:

Sun Jun 29 08:01:00 CEST 2014 10 started
Sun Jun 29 08:01:00 CEST 2014 9 started
Sun Jun 29 08:01:09 CEST 2014 9 finished
Sun Jun 29 08:01:09 CEST 2014 added 9
Sun Jun 29 08:01:09 CEST 2014 8 started
Sun Jun 29 08:01:10 CEST 2014 10 finished
Sun Jun 29 08:01:10 CEST 2014 7 started
Sun Jun 29 08:01:10 CEST 2014 added 10
Sun Jun 29 08:01:17 CEST 2014 7 finished
Sun Jun 29 08:01:17 CEST 2014 6 started
Sun Jun 29 08:01:17 CEST 2014 added 7
Sun Jun 29 08:01:17 CEST 2014 8 finished
Sun Jun 29 08:01:17 CEST 2014 added 8
Sun Jun 29 08:01:17 CEST 2014 5 started
Sun Jun 29 08:01:22 CEST 2014 5 finished
Sun Jun 29 08:01:22 CEST 2014 added 5
Sun Jun 29 08:01:22 CEST 2014 4 started
Sun Jun 29 08:01:23 CEST 2014 6 finished
Sun Jun 29 08:01:23 CEST 2014 3 started
Sun Jun 29 08:01:23 CEST 2014 added 6
Sun Jun 29 08:01:26 CEST 2014 3 finished
Sun Jun 29 08:01:26 CEST 2014 2 started
Sun Jun 29 08:01:26 CEST 2014 added 3
Sun Jun 29 08:01:26 CEST 2014 4 finished
Sun Jun 29 08:01:26 CEST 2014 1 started
Sun Jun 29 08:01:26 CEST 2014 added 4
Sun Jun 29 08:01:27 CEST 2014 1 finished
Sun Jun 29 08:01:27 CEST 2014 added 1
Sun Jun 29 08:01:28 CEST 2014 2 finished
Sun Jun 29 08:01:28 CEST 2014 added 2
Sun Jun 29 08:01:28 CEST 2014 results:
Sun Jun 29 08:01:28 CEST 2014 9
Sun Jun 29 08:01:28 CEST 2014 10
Sun Jun 29 08:01:28 CEST 2014 7
Sun Jun 29 08:01:28 CEST 2014 8
Sun Jun 29 08:01:28 CEST 2014 5
Sun Jun 29 08:01:28 CEST 2014 6
Sun Jun 29 08:01:28 CEST 2014 3
Sun Jun 29 08:01:28 CEST 2014 4
Sun Jun 29 08:01:28 CEST 2014 1
Sun Jun 29 08:01:28 CEST 2014 2 

That does not work like I would like to have it. I want that each Callable running longer than 5 seconds should be terminated/ended/interruped and only the Callable running lower than 5 seconds give me a valid result.

I also tried it without the ExecutorCompletionService

public class CallableTest2 {
    public static void main(String[] args) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);
        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
        
        for (int i = 10; i > 0; i--) {
            Job job = new Job(i * 1000, i);
            futures.add(newFixedThreadPool.submit(job));
        }
        
        ArrayList<Integer> results = new ArrayList<Integer>();
        for (Future<Integer> future: futures) {
            try {
                Integer content = future.get(5, TimeUnit.SECONDS);
                results.add(content);
                System.out.println(new Date() + " added " + content);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        newFixedThreadPool.shutdownNow();

        System.out.println(new Date() + " results:");
        for (int j : results) {
            System.out.println(new Date() + " " + j);
        }
    }
}

With the results:

Sun Jun 29 08:33:19 CEST 2014 9 started
Sun Jun 29 08:33:19 CEST 2014 10 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at callabletest.CallableTest2.main(CallableTest2.java:29)
Sun Jun 29 08:33:28 CEST 2014 9 finished
Sun Jun 29 08:33:28 CEST 2014 8 started
Sun Jun 29 08:33:28 CEST 2014 added 9
Sun Jun 29 08:33:29 CEST 2014 10 finished
Sun Jun 29 08:33:29 CEST 2014 7 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at callabletest.CallableTest2.main(CallableTest2.java:29)
Sun Jun 29 08:33:36 CEST 2014 7 finished
Sun Jun 29 08:33:36 CEST 2014 added 7
Sun Jun 29 08:33:36 CEST 2014 6 started
Sun Jun 29 08:33:36 CEST 2014 8 finished
Sun Jun 29 08:33:36 CEST 2014 5 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(Sun Jun 29 08:33:41 CEST 2014 5 finished
FutureTask.java:228)
Sun Jun 29 08:33:41 CEST 2014 added 5
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
Sun Jun 29 08:33:41 CEST 2014 4 started
    at callabletest.CallableTest2.main(CallableTest2.java:29)
Sun Jun 29 08:33:42 CEST 2014 6 finished
Sun Jun 29 08:33:42 CEST 2014 3 started
Sun Jun 29 08:33:45 CEST 2014 3 finished
Sun Jun 29 08:33:45 CEST 2014 2 started
Sun Jun 29 08:33:45 CEST 2014 4 finished
Sun Jun 29 08:33:45 CEST 2014 added 4
Sun Jun 29 08:33:45 CEST 2014 added 3
Sun Jun 29 08:33:45 CEST 2014 1 started
Sun Jun 29 08:33:46 CEST 2014 1 finished
Sun Jun 29 08:33:47 CEST 2014 2 finished
Sun Jun 29 08:33:47 CEST 2014 added 2
Sun Jun 29 08:33:47 CEST 2014 added 1
Sun Jun 29 08:33:47 CEST 2014 results:
Sun Jun 29 08:33:47 CEST 2014 9
Sun Jun 29 08:33:47 CEST 2014 7
Sun Jun 29 08:33:47 CEST 2014 5
Sun Jun 29 08:33:47 CEST 2014 4
Sun Jun 29 08:33:47 CEST 2014 3
Sun Jun 29 08:33:47 CEST 2014 2
Sun Jun 29 08:33:47 CEST 2014 1

Now I get some TimeoutExceptions, but also not where I expect them. Eg. The Callable running 9 and 7 seconds does not throw an Exception!

What do I have to change in the code, to only get the results of the short running Threads and kill the long running ones. In the example only the results 1-5 without 6-10.

I've tested a lot of things but I can't get it to work. Please help


This is an answer to the post of bstar55 using a ScheduledExecutorService.

I changed my code regarding to your hint to:

public class CallableTest3 {

    public static void main(String[] args) {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
        
        for (int i = 10; i > 0; i--) {
            Job job = new Job(i * 1000, i);
            final Future handler = executor.submit(job);
            final int x = i;
            executor.schedule(new Runnable() {

                public void run() {
                    boolean cancel = handler.cancel(true);
                    if(cancel){
                        System.out.println(new Date() + " job " + x + " cancelled");
                    }else{
                        System.out.println(new Date() + " job " + x + " not cancelled");
                    }
                }
            }, 5000, TimeUnit.MILLISECONDS);
            futures.add(handler);
        }

        ArrayList<Integer> results = new ArrayList<Integer>();
        for (Future<Integer> future : futures) {
            try {
                Integer content = future.get(5, TimeUnit.SECONDS);
                results.add(content);
                System.out.println(new Date() + " added " + content);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();

        System.out.println(new Date() + " results:");
        for (int j : results) {
            System.out.println(new Date() + " --- " + j);
        }
    }
}

But this also does not work as expected. Result:

Sun Jun 29 10:27:41 CEST 2014 9 started
Sun Jun 29 10:27:41 CEST 2014 10 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at callabletest.CallableTest3.main(CallableTest3.java:43)
Sun Jun 29 10:27:50 CEST 2014 9 finished
Sun Jun 29 10:27:50 CEST 2014 added 9
Sun Jun 29 10:27:50 CEST 2014 8 started
Sun Jun 29 10:27:51 CEST 2014 10 finished
Sun Jun 29 10:27:51 CEST 2014 7 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at callabletest.CallableTest3.main(CallableTest3.java:43)
Sun Jun 29 10:27:58 CEST 2014 8 finished
Sun Jun 29 10:27:58 CEST 2014 6 started
Sun Jun 29 10:27:58 CEST 2014 7 finished
Sun Jun 29 10:27:58 CEST 2014 5 started
Sun Jun 29 10:27:58 CEST 2014 added 7
Sun Jun 29 10:28:03 CEST 2014 5 finished
Sun Jun 29 10:28:03 CEST 2014 4 started
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:228)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
Sun Jun 29 10:28:03 CEST 2014 added 5
    at callabletest.CallableTest3.main(CallableTest3.java:43)
Sun Jun 29 10:28:04 CEST 2014 6 finished
Sun Jun 29 10:28:04 CEST 2014 3 started
Sun Jun 29 10:28:07 CEST 2014 3 finished
Sun Jun 29 10:28:07 CEST 2014 2 started
Sun Jun 29 10:28:07 CEST 2014 4 finished
Sun Jun 29 10:28:07 CEST 2014 added 4
Sun Jun 29 10:28:07 CEST 2014 added 3
Sun Jun 29 10:28:07 CEST 2014 1 started
java.util.concurrent.CancellationException
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:230)
    at java.util.concurrent.FutureTask.get(FutureTask.java:91)
    at callabletest.CallableTest3.main(CallableTest3.java:43)
Sun Jun 29 10:28:08 CEST 2014 1 finished
Sun Jun 29 10:28:08 CEST 2014 job 10 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 9 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 8 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 7 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 6 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 5 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 4 not cancelled
Sun Jun 29 10:28:08 CEST 2014 job 3 not cancelled
Sun Jun 29 10:28:08 CEST 2014 2 interrupted
Sun Jun 29 10:28:08 CEST 2014 job 1 not cancelled
Sun Jun 29 10:28:08 CEST 2014 added 1
Sun Jun 29 10:28:08 CEST 2014 results:
Sun Jun 29 10:28:08 CEST 2014 --- 9
Sun Jun 29 10:28:08 CEST 2014 --- 7
Sun Jun 29 10:28:08 CEST 2014 --- 5
Sun Jun 29 10:28:08 CEST 2014 --- 4
Sun Jun 29 10:28:08 CEST 2014 --- 3
Sun Jun 29 10:28:08 CEST 2014 --- 1
Sun Jun 29 10:28:08 CEST 2014 job 2 cancelled

But instead the job 2 was cancelled!


like image 285
user3787186 Avatar asked Jun 29 '14 06:06

user3787186


People also ask

What is ExecutorService in Java What is CompletionService in Java?

The practical main difference between ExecutorService and CompletionService is: ExecutorService get() will try to retrieve the results in the submitted order waiting for completion. CompletionService take() + get() will try to retrieve the results in the completion order disregarding the submission order.

Is Callable asynchronous?

Callable , represents an asynchronous task which can be executed by a separate thread. For instance, it is possible to submit a Callable object to a Java ExecutorService which will then execute it asynchronously.

How do you execute a Callable in Java?

For supporting this feature, the Callable interface is present in Java. For implementing Runnable, the run() method needs to be implemented which does not return anything, while for a Callable, the call() method needs to be implemented which returns a result on completion.

What is return type of Callable in Java?

Differences between Callable and Runnable Interface The return type of the call() method of the interface is an Object. Hence, the call() method returns an Object. The return type of the run() method of the interface is void.


1 Answers

I suggest you to divide your problem into 2 separate ones:

  1. run on multiple threads
  2. use a timeout for each operation

For the first (multithreading), you already used the service executor that can manage that on 2 Threads : Executors.newFixedThreadPool(2). If you apply the timeout here, the timeout act for the run of all tasks, but you need a timeout for each job.

For the timout issue, you can manage it thanks to a new service executor per job in a class: JobManager.

package com.stackoverflow.q24473796;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JobManager implements Callable<Integer> {

protected long timeout;
protected TimeUnit timeUnit;
protected Callable<Integer> job;

public JobManager(long timeout, TimeUnit timeUnit, Callable<Integer> job) {
this.timeout = timeout;
this.timeUnit = timeUnit;
this.job = job;
}

@Override
public Integer call() {
    Integer result = new Integer(-1); // default, this could be adapted
    ExecutorService exec = Executors.newSingleThreadExecutor();

    try {
        result = exec.submit(job).get(timeout, timeUnit);
    } catch (InterruptedException | ExecutionException | TimeoutException e) {
        // Whatever you want
        if (e instanceof TimeoutException) {
            System.out.println("Timeout get for " + job.toString());
        } else {
            System.out.println("exception get for " + job.toString() + " : " + e.getMessage());
        }

    }
    exec.shutdown();
    return result;
    }
}

Then, you can call the tasks from your main thread as following:

    Job job = new Job(i * 1000, i);
    Future<Integer> future = newFixedThreadPool.submit(new JobManager(5, TimeUnit.SECONDS, job));

I addapted your CallableTest: package com.stackoverflow.q24473796;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class CallableTest {

    public static void main(String[] args) {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(2);

        List<Future<Integer>> futures = new ArrayList<Future<Integer>>();
        for (int i = 10; i > 0; i--) {
            Job job = new Job(i * 1000, i);
            Future<Integer> future = newFixedThreadPool.submit(new   JobManager(5, TimeUnit.SECONDS, job));
            futures.add(future);
        }

        ArrayList<Integer> results = new ArrayList<Integer>();
        for (Future<Integer> future : futures) {
            Integer result = new Integer(-1);
            try {
                result = future.get();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            if (result != -1) {
                results.add(result);
            }
        }

        newFixedThreadPool.shutdown();

        try {
            newFixedThreadPool.awaitTermination(60, TimeUnit.SECONDS); //Global Timeout
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(new Date() + " results:");
        for (int j : results) {
            System.out.println(new Date() + " " + j);
        }
    }
}

And you'll get the following output:

Wed Apr 29 10:51:02 CEST 2015 10 started
Wed Apr 29 10:51:02 CEST 2015 9 started
Timeout get for com.stackoverflow.q24473796.Job@249fe45c
Timeout get for com.stackoverflow.q24473796.Job@249fe45c
Wed Apr 29 10:51:07 CEST 2015 8 started
Wed Apr 29 10:51:07 CEST 2015 7 started
Wed Apr 29 10:51:11 CEST 2015 9 finished
Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0
Timeout get for com.stackoverflow.q24473796.Job@3cd4c5a0
Wed Apr 29 10:51:12 CEST 2015 6 started
Wed Apr 29 10:51:12 CEST 2015 5 started
Wed Apr 29 10:51:12 CEST 2015 10 finished
Wed Apr 29 10:51:14 CEST 2015 7 finished
Wed Apr 29 10:51:15 CEST 2015 8 finished
Wed Apr 29 10:51:17 CEST 2015 5 finished
Wed Apr 29 10:51:17 CEST 2015 4 started
Timeout get for com.stackoverflow.q24473796.Job@2a0fded2
Wed Apr 29 10:51:17 CEST 2015 3 started
Wed Apr 29 10:51:18 CEST 2015 6 finished
Wed Apr 29 10:51:20 CEST 2015 3 finished
Wed Apr 29 10:51:20 CEST 2015 2 started
Wed Apr 29 10:51:21 CEST 2015 4 finished
Wed Apr 29 10:51:21 CEST 2015 1 started
Wed Apr 29 10:51:22 CEST 2015 1 finished
Wed Apr 29 10:51:22 CEST 2015 2 finished
Wed Apr 29 10:51:22 CEST 2015 results:
Wed Apr 29 10:51:22 CEST 2015 5
Wed Apr 29 10:51:22 CEST 2015 4
Wed Apr 29 10:51:22 CEST 2015 3
Wed Apr 29 10:51:22 CEST 2015 2
Wed Apr 29 10:51:22 CEST 2015 1
like image 109
Loic Mouchard Avatar answered Sep 20 '22 09:09

Loic Mouchard