Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ExecutorService with backpressure

I've got following requirements to an ExecutorService:

  • limited number of concurrently processing tasks (threads), preferably configurable
  • when all threads are occupied, subsequently submitted tasks get queued.
  • the queue should apply backpressure: if too many tasks are submitted and the queue is full, the submission should block.
  • the blocking submission can have a timeout: if a task is not submitted within a given time, an exception should be thrown.

Reason: the producers of the tasks may overwhelm the executor service or queue, resulting in an increased memory demand if too many tasks are in the queue. The backpressure will block the producers in such a situation, until the processing can catch up. The producers (some reading from Kafka, other reading from JDBC) may have open transactions which can timeout, so the submission of new tasks, if blocking, should be able to time out as well.

As the executor service implementations, such as the ThreadPoolExecutor, use a BlockingQueue to enqueue tasks, I would have expected that the queue would actually have such a feature to only queue up to a defined number of tasks and then blocking until tasks are pulled from the queue by the worker threads. Instead it turned out that when the queue is full, additional tasks are rejected with a RejectedExecutionException on submission. This is certainly not what I want.

I came up with a wrapper for the ExecutorService, which uses a Semaphore to control how many tasks are concurrently accepted, which also allows me to block (with a timeout) when all permits of the Semaphore are acquired:

public class Pipeline {

    private final ExecutorService executorService;
    private final Semaphore semaphore;

    private final int queueTimeout;
    private final TimeUnit queueTimeoutUnit;

    public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
        semaphore = new Semaphore(maxConcurrent);
        executorService = Executors.newCachedThreadPool();

        this.queueTimeout = queueTimeout;
        this.queueTimeoutUnit = queueTimeoutUnit;
    }

    public <T> Future<T> submit(Callable<T> task) {
        try {
            boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
            if (!acquired) {
                throw new RuntimeException("Timeout accepting task");
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return executorService.submit(() -> {
            try {
                return task.call();
            } finally {
                semaphore.release();
            }
        });
    }

    public void shutdown() {
        executorService.shutdown();
    }
}

This actually works, but seems like a common enough use case that may already be covered in the Java API. Am I missing some built-in functionality for this?

like image 878
Peter Walser Avatar asked Feb 07 '26 07:02

Peter Walser


1 Answers

What if you create a ThreadPoolExecutor, and you provide a fixed-length ArrayBlockingQueue and your own custom RejectedExecutionHandler?

NOTE! I have not tried this myself. It's just an idea.

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

class MyHandler implements RejectedExecutionHandler {
    final BlockingQueue<Runnable> task_queue;
    final long timeout;
    final TimeUnit unit;

    public MyHandler(
        BlockingQueue<Runnable> task_queue,
        long timeout,
        TimeUnit unit
    ) {
        this.task_queue = task_queue;
        this.timeout = timeout;
        this.unit = unit;
    }

    @override
    public void rejectedExecution(
        Runnable task,
        ThreadPoolExecutor pool
    ) {
        boolean timed_out = false;
        try {
            timed_out = ! task_queue.offer(task, timeout, unit);
        }
        catch (Exception ex) {
            ...use your shameful imagination here...
        }
        if (timed_out) {
            throw new RejectedExecutionException("queue is full");
        }
    }
}

Then, create the executor:

BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
    corePoolSize,  maximumPoolSize,
    keepAliveTime, keepAliveTimeUnit,
    task_queue,
    new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);

It's a bit hack-y because it messes with the queue behind the executor's back, but my expectation is, if the queue is full when your code calls pool.submit(task), then the rejectedExecution() method of the handler will be called. It will wait for the specified amount of time to add the task to the queue.

If it succeeds, then the task is added to the queue, and everybody's happy. If it fails, then the RejectedExecutionException will be propagated back for the caller of pool.submit(task) to catch and handle however you see fit.

like image 146
Solomon Slow Avatar answered Feb 08 '26 21:02

Solomon Slow



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!