Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Java ExecutorService invokeAll() interrupting

I have a fixed thread pool ExecutorService of width 10, and a list of 100 Callable's, each waiting for 20 seconds and recording their interrupts.

I'm calling invokeAll on that list in a separate thread, and almost immediately interrupting this thread. ExecutorService execution is interrupted as expected, but the actual number of interrupts recorded by Callables is far more than expected 10 - around 20-40. Why is that so, if ExecutorService can execute no more than 10 threads simultaneously?

Full source: (You may need to run it more that once due to concurrency)

@Test
public void interrupt3() throws Exception{
    int callableNum = 100;
    int executorThreadNum = 10;
    final AtomicInteger interruptCounter = new AtomicInteger(0);
    final ExecutorService executorService = Executors.newFixedThreadPool(executorThreadNum);
    final List <Callable <Object>> executeds = new ArrayList <Callable <Object>>();
    for (int i = 0; i < callableNum; ++i) {
        executeds.add(new Waiter(interruptCounter));
    }
    Thread watcher = new Thread(new Runnable() {

        @Override
        public void run(){
            try {
                executorService.invokeAll(executeds);
            } catch(InterruptedException ex) {
                // NOOP
            }
        }
    });
    watcher.start();
    Thread.sleep(200);
    watcher.interrupt();
    Thread.sleep(200);
    assertEquals(10, interruptCounter.get());
}

// This class just waits for 20 seconds, recording it's interrupts
private class Waiter implements Callable <Object> {
    private AtomicInteger    interruptCounter;

    public Waiter(AtomicInteger interruptCounter){
        this.interruptCounter = interruptCounter;
    }

    @Override
    public Object call() throws Exception{
        try {
            Thread.sleep(20000);
        } catch(InterruptedException ex) {
            interruptCounter.getAndIncrement();
        }
        return null;
    }
}

Using WinXP 32-bit, Oracle JRE 1.6.0_27 and JUnit4

like image 538
Alex Abdugafarov Avatar asked Nov 02 '11 06:11

Alex Abdugafarov


1 Answers

I disagree with the hypothesis that you should only receive 10 interrupts.

Assume the CPU has 1 core.
1. Main thread starts Watcher and sleeps
2. Watcher starts and adds 100 Waiters then blocks
3. Waiter 1-10 start and sleep in sequence
4. Main wakes and interrupts Watcher then sleeps
5. Watcher cancels Waiter 1-5 then is yielded by the OS   (now we have 5 interrupts)
6. Waiter 11-13 start and sleep
7. Watcher cancels Waiter 6-20 then is yielded by the OS   (now we have 13 interrupts)
8. Waiter 14-20 are "started" resulting in a no-op
9. Waiter 21-24 start and sleep
....

Essentially, my argument is that there is no guarantee that the Watcher thread will be allowed to cancel all 100 "Waiter" RunnableFuture instances before it has to yield the time slice and allow the ExecutorService's worker threads to start more Waiter tasks.

Update: Showing code from AbstractExecutorService

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException {
    if (tasks == null)
        throw new NullPointerException();
    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
    boolean done = false;
    try {
        for (Callable<T> t : tasks) {
            RunnableFuture<T> f = newTaskFor(t);
            futures.add(f);
            execute(f);
        }
        for (Future<T> f : futures) {
            if (!f.isDone()) {
                try {
                    f.get(); //If interrupted, this is where the InterruptedException will be thrown from
                } catch (CancellationException ignore) {
                } catch (ExecutionException ignore) {
                }
            }
        }
        done = true;
        return futures;
    } finally {
        if (!done)
            for (Future<T> f : futures)
                f.cancel(true); //Specifying "true" is what allows an interrupt to be sent to the ExecutorService's worker threads
    }
}

The finally block which contains f.cancel(true) is when the interrupt would be propagated to task which is currently running. As you can see, this is a tight loop, but there is no guarantee that the thread executing the loop would be able to iterate through all instances of Future in one time slice.

like image 53
Tim Bender Avatar answered Sep 28 '22 21:09

Tim Bender