Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

future.isDone returns false even if the task is done

I have tricky situation, Does future.isDone() returns false, even if the thread is done.

import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DataAccessor {
    private static ThreadPoolExecutor executor;
    private int timeout = 100000;
    static {
        executor = new ThreadPoolExecutor(10, 10, 1000, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000));
    }

    public static void main(String[] args) {
        List<String> requests = new ArrayList<String>();
        for(int i=0; i<20; i++){
            requests.add("request:"+i);
        }
        DataAccessor dataAccessor = new DataAccessor();

        List<ProcessedResponse> results = dataAccessor.getDataFromService(requests);
        for(ProcessedResponse response:results){
            System.out.println("response"+response.toString()+"\n");
        }
        executor.shutdown();
    }

    public List<ProcessedResponse> getDataFromService(List<String> requests) {
        final CountDownLatch latch = new CountDownLatch(requests.size());
        List<SubmittedJob> submittedJobs = new ArrayList<SubmittedJob>(requests.size());
        for (String request : requests) {
            Future<ProcessedResponse> future = executor.submit(new GetAndProcessResponse(request, latch));
            submittedJobs.add(new SubmittedJob(future, request));
        }
        try {
            if (!latch.await(timeout, TimeUnit.MILLISECONDS)) {
                // some of the jobs not done
                System.out.println("some jobs not done");
            }
        } catch (InterruptedException e1) {
            // take care, or cleanup
            for (SubmittedJob job : submittedJobs) {
                job.getFuture().cancel(true);
            }
        }
        List<ProcessedResponse> results = new LinkedList<DataAccessor.ProcessedResponse>();
        for (SubmittedJob job : submittedJobs) {
            try {
                // before doing a get you may check if it is done
                if (!job.getFuture().isDone()) {
                    // cancel job and continue with others
                    job.getFuture().cancel(true);
                    continue;
                }
                ProcessedResponse response = job.getFuture().get();
                results.add(response);
            } catch (ExecutionException cause) {
                // exceptions occurred during execution, in any
            } catch (InterruptedException e) {
                // take care
            }
        }
        return results;
    }

    private class SubmittedJob {
        final String request;
        final Future<ProcessedResponse> future;

        public Future<ProcessedResponse> getFuture() {
            return future;
        }

        public String getRequest() {
            return request;
        }

        SubmittedJob(final Future<ProcessedResponse> job, final String request) {
            this.future = job;
            this.request = request;
        }
    }

    private class ProcessedResponse {
        private final String request;
        private final String response;

        ProcessedResponse(final String request, final String response) {
            this.request = request;
            this.response = response;
        }

        public String getRequest() {
            return request;
        }

        public String getResponse() {
            return response;
        }

        public String toString(){
            return "[request:"+request+","+"response:"+ response+"]";
        }
    }

    private class GetAndProcessResponse implements Callable<ProcessedResponse> {
        private final String request;
        private final CountDownLatch countDownLatch;

        GetAndProcessResponse(final String request, final CountDownLatch countDownLatch) {
            this.request = request;
            this.countDownLatch = countDownLatch;
        }

        public ProcessedResponse call() {
            try {
                return getAndProcessResponse(this.request);
            } finally {
                countDownLatch.countDown();
            }
        }

        private ProcessedResponse getAndProcessResponse(final String request) {
            // do the service call
            // ........
            if("request:16".equals(request)){
                throw (new RuntimeException("runtime"));
            }
            return (new ProcessedResponse(request, "response.of." + request));
        }
    }
}

if I call future.isDone() it returns false though the coundownLatch.await() return true. Any Idea? Also to note that the countDownLatch.await comes out immediately when this happens.

If you are finding the format not readable view here, http://tinyurl.com/7j6cvep .

like image 489
yadab Avatar asked Mar 07 '12 15:03

yadab


People also ask

How to make a thread return a result when it terminates?

There are two ways of creating threads – one by extending the Thread class and other by creating a thread with a Runnable. However, one feature lacking in Runnable is that we cannot make a thread return result when it terminates, i.e. when run () completes. For supporting this feature, the Callable interface is present in Java.

How to stop the execution of a future?

We can use Future.cancel (boolean) to tell the executor to stop the operation and interrupt its underlying thread: Our instance of Future, from the code above, will never complete its operation.

How to get the result of a future in Java?

To obtain the result, a Future is required. The Java library has the concrete type FutureTask, which implements Runnable and Future, combining both functionality conveniently. A FutureTask can be created by providing its constructor with a Callable. Then the FutureTask object is provided to the constructor of Thread to create the Thread object.

What are forkjointask and recursiveaction?

There are two abstract classes that implement ForkJoinTask: RecursiveTask, which returns a value upon completion, and RecursiveAction, which doesn't return anything. As their names imply, these classes are to be used for recursive tasks, such as file-system navigation or complex mathematical computation.


2 Answers

The issue is most likely one of timing. the latch will be released before all of the tasks are actually complete with regards to the Future (because the countDown() invocation is within the call() method).

you are basically recreating the work of a CompletionService (implementation is ExecutorCompletionService), i would recommend using that instead. you can use the poll(timeout) method to get the results. just keep track of the total time and make sure you reduce your timeout on each call to the total remaining time.

like image 101
jtahlborn Avatar answered Sep 25 '22 06:09

jtahlborn


As jtahlborn mentioned this is probably a race condition in which the CountdownLatch signals its waiting threads, which the waiting threads evaluates the Future's cancel condition before the FutureTask finishes its execution (which will occur at some point after the countDown).

You simply cannot rely on the synchronization mechanisms of the CountdownLatch to be in sync with the sync mechanisms of a Future. What you should do is rely on the Future to tell you when it is done.

You can Future.get(long timeout, TimeUnit.MILLISECONDS) instead of CountdownLatch.await(long timeout, TimeUnit.MILLISECONDS). To get the same type of effect as the latch you can add all the Futures to a List, iterate over the list and get on each Future.

like image 39
John Vint Avatar answered Sep 25 '22 06:09

John Vint