I have a producer that reads blocks of text from the disk. Multiple consumers are doing computations on that blocks.
I would like producer to pause reading data from the disk if there are more that n blocks currently being computed over.
Have put it in pseudocode to illustrate what I would like to achieve.
// "produceBlocks" reads blocks from disk one by one
// and feeds them to lambda
produceBlocks(block -> {
// (!) if activeCounter exceeds a THRESHOLD, then pause
executorService.submit(() -> {
activeCounter.incrementAndGet();
// do some work
activeCounter.decrementAndGet();
});
});
I would use a fixed length queue for your thread pool and implement a RejectedExecuptionHandler to either run in the current thread or to pause and retry.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/RejectedExecutionHandler.html#rejectedExecution(java.lang.Runnable,%20java.util.concurrent.ThreadPoolExecutor)
e.g.
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.CallerRunsPolicy.html
This last option I have used effectively and it doesn't require extra code once the ExecutorService is configured.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With