I've got following requirements to an ExecutorService:
Reason: the producers of the tasks may overwhelm the executor service or queue, resulting in an increased memory demand if too many tasks are in the queue. The backpressure will block the producers in such a situation, until the processing can catch up. The producers (some reading from Kafka, other reading from JDBC) may have open transactions which can timeout, so the submission of new tasks, if blocking, should be able to time out as well.
As the executor service implementations, such as the ThreadPoolExecutor, use a BlockingQueue to enqueue tasks, I would have expected that the queue would actually have such a feature to only queue up to a defined number of tasks and then blocking until tasks are pulled from the queue by the worker threads. Instead it turned out that when the queue is full, additional tasks are rejected with a RejectedExecutionException on submission. This is certainly not what I want.
I came up with a wrapper for the ExecutorService, which uses a Semaphore to control how many tasks are concurrently accepted, which also allows me to block (with a timeout) when all permits of the Semaphore are acquired:
public class Pipeline {
private final ExecutorService executorService;
private final Semaphore semaphore;
private final int queueTimeout;
private final TimeUnit queueTimeoutUnit;
public Pipeline(int maxConcurrent, int queueTimeout, TimeUnit queueTimeoutUnit) {
semaphore = new Semaphore(maxConcurrent);
executorService = Executors.newCachedThreadPool();
this.queueTimeout = queueTimeout;
this.queueTimeoutUnit = queueTimeoutUnit;
}
public <T> Future<T> submit(Callable<T> task) {
try {
boolean acquired = semaphore.tryAcquire(queueTimeout, queueTimeoutUnit);
if (!acquired) {
throw new RuntimeException("Timeout accepting task");
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return executorService.submit(() -> {
try {
return task.call();
} finally {
semaphore.release();
}
});
}
public void shutdown() {
executorService.shutdown();
}
}
This actually works, but seems like a common enough use case that may already be covered in the Java API. Am I missing some built-in functionality for this?
What if you create a ThreadPoolExecutor, and you provide a fixed-length ArrayBlockingQueue and your own custom RejectedExecutionHandler?
NOTE! I have not tried this myself. It's just an idea.
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;
class MyHandler implements RejectedExecutionHandler {
final BlockingQueue<Runnable> task_queue;
final long timeout;
final TimeUnit unit;
public MyHandler(
BlockingQueue<Runnable> task_queue,
long timeout,
TimeUnit unit
) {
this.task_queue = task_queue;
this.timeout = timeout;
this.unit = unit;
}
@override
public void rejectedExecution(
Runnable task,
ThreadPoolExecutor pool
) {
boolean timed_out = false;
try {
timed_out = ! task_queue.offer(task, timeout, unit);
}
catch (Exception ex) {
...use your shameful imagination here...
}
if (timed_out) {
throw new RejectedExecutionException("queue is full");
}
}
}
Then, create the executor:
BlockingQueue<Runnable> task_queue = new ArrayBlockingQueue<>(...);
ExecutorService pool = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize,
keepAliveTime, keepAliveTimeUnit,
task_queue,
new MyHandler(task_queue, submitTimeout, submitTimeoutUnit)
);
It's a bit hack-y because it messes with the queue behind the executor's back, but my expectation is, if the queue is full when your code calls pool.submit(task), then the rejectedExecution() method of the handler will be called. It will wait for the specified amount of time to add the task to the queue.
If it succeeds, then the task is added to the queue, and everybody's happy. If it fails, then the RejectedExecutionException will be propagated back for the caller of pool.submit(task) to catch and handle however you see fit.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With