Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Preventing possible deadlock scenarios in bounded executor services with producers and consumers in Java

Consider this sample code: (I have simplified the classes a lot, so they are easier to read)

The producer

class RandomIntegerProducer implements Callable<Void>
{
    private final BlockingQueue<? super Integer> queue;
    private final Random random;

    /* Boilerplate constructor... */

    @Override
    public Void call()
    {
        while (!Thread.interrupted())
        {
            try {
                TimeUnit.SECONDS.sleep(1);
                queue.put(random.nextInt());
            } catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
                break;
            }
        }
        return null;
    }
}

This is a simple, concise sample of a task that puts a random number into a queue every second and is cancellable with Thread.interrupt().

The consumer

class NumberConsumer implements Callable<Void>
{
    private final BlockingQueue<? extends Number> queue;
    private final Appendable target;

    /* Boilerplate constructor... */

    @Override
    public Void call() throws IOException
    {
        while (!Thread.interrupted())
        {
            try {
                target.append(queue.take().toString());
            } catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
                break;
            }
        }
        return null;
    }
}

The consumer takes numbers from the queue and prints them to the specified Appendable. Can be cancelled via Thread.interrupt().

The starting code

class ProducerConsumerStarter
{
    /* Notice this is a fixed size (e.g. bounded) executor service */
    private static final ExecutorService SERVICE = Executors.newFixedThreadPool(8);

    public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers)
    {
        List<Callable<Void>> callables = new ArrayList<>();
        BlockingQueue<Integer> commonQueue = new ArrayBlockingQueue<>(16);
        for (int i = 0; i < producers; i++)
        {
            callables.add(new RandomIntegerProducer(commonQueue, new Random()));
        }
        for (int i = 0; i < consumers; i++)
        {
            callables.add(new NumberConsumer(commonQueue, System.out));
        }
        // Submit them all (in order)
        return callables.stream().map(SERVICE::submit).collect(Collectors.toList());
    }
}

This utility method submits tasks to the bounded executor service (in order - first all producers, then all consumers)

The client code that fails it

public class FailingExaple {
    @org.junit.Test
    public void deadlockApplication() throws Exception
    {
        List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
        for (Future<Void> future : futures)
        {
            System.out.println("Getting future");
            future.get();
        }
    }
}

This sample code fails this concurrent program by deadlocking it and any other future callers to the starting code.


The question is: How can I both prevent my application from spawning an insane amount of threads on high load (I want the tasks to be queued instead) and still prevent deadlock by simply the executor being polluted by producers only?

Even if this sample is obvious to fail 100% of the time, consider a concurrent program that in an unlucky situation completely fills the bounded executor with producers only - you will encounter the same general problem.

like image 522
randers Avatar asked Nov 05 '25 13:11

randers


1 Answers

What is Deadlock? Java Documentation

Deadlock describes a situation where two or more threads are blocked forever, waiting for each other.

So deadlock happens when 1st thread holds monitor1 and tries to acquire monitor2, while 2nd thread holds monitor2 and tries to acquire monitor1.
There is no deadlock in your code, because there are no two or more threads .. waiting for each other. There are producers waiting for space in a Queue and no consumers, because they were not scheduled due to executor's number of threads.

Also "The client code that fails it" will always block thread, even with startIntegerProducerConsumer(1,1)

public class FailingExaple {
    @org.junit.Test
    public void deadlockApplication() throws Exception
    {
        List<Future<Void>> futures = ProducerConsumerStarter.startIntegerProducerConsumer(10, 10);
        for (Future<Void> future : futures)
        {
            System.out.println("Getting future");
            future.get();
        }
    }
}

because your producers and consumers keep running till explicit interruption occurs, which won't happen in deadlockApplication().

Your code should look like this

for (Future<Void> future : futures)
{
    if (future.isDone()) {
        try {
            System.out.println("Getting future");
            future.get();
        } catch (CancellationException ce) {

        } catch (ExecutionException ee) {

        }
    } else {
        System.out.println("The future is not done, cancelling it");
        if (future.cancel(true)) {
            System.out.println("task was cancelled");
        } else {
            //handle case when FutureTask#cancel(boolean mayInterruptIfRunning) wasn't cancelled
        }
    }
}

This loop will get results of completed tasks and cancel not completed.

@vanOekel is right, it's better to have two thread pools, one for consumers and another for producers.
Like this

class ProducerConsumerStarter
{
    private static final ExecutorService CONSUMERS = Executors.newFixedThreadPool(8); 
    private static final ExecutorService PRODUCERS = Executors.newFixedThreadPool(8); 

    public static List<Future<Void>> startIntegerProducerConsumer(int producers, int consumers) {
        ...
    }
}

and startIntegerProducerConsumer(int, int) which submits consumers and producers accordingly.
But in this case new tasks will be queued up and won't be started until the previously submitted producers and consumers are finished (it won't happen if these tasks are not interrupted).

You can also go further and optimize your producer's code. At first change code

class RandomIntegerProducer implements Runnable
{
    private final BlockingQueue<? super Integer> queue;
    private final Random random;
    ...
    @Override
    public void run()
    {
        queue.offer(random.nextInt());
    }
}

then start submit producers into ScheduledExecutorService using scheduleWithFixedDelay(producer, 1, 1, TimeUnit.SECONDS). This change will help keep producers running without blocking each other. But it'll also slightly change application's semantics.
You can keep ScheduledExecutorService (for producers) initialized as class variable. The only inconvenience is that you will have to change return type of startIntegerProducerConsumer(int producers, int consumers) method to List<Future<?>> but actually the ScheduledFutures<?> return in scheduleWithFixedDelay(..) would still be of type Future<Void>. You can do the same thing with consumers if possible maximum delay, during consuming of a newly generated number, which equals to a delay (passed into scheduleWithFixedDelay()) is fine for you.

I hope my answer helped a bit.

like image 142
dezhik Avatar answered Nov 08 '25 11:11

dezhik