I am getting data from a queue server and I need to process it and send an acknowledgement. Something like this:
while (true) {
queueserver.get.data
ThreadPoolExecutor //send data to thread
queueserver.acknowledgement
I don't fully understand what happens in threads but I think this program gets the data, sends it the thread and then immediately acknowledges it. So even if I have a limit of each queue can only have 200 unacknowledged items, it will just pull as fast as it can receive it. This is good when I write a program on a single server, but if I'm using multiple workers then this becomes an issue because the amount of items in the thread queue are not a reflection of the work its done but instead of how fast it can get items from the queue server.
Is there anything I can do to somehow make the program wait if the thread queue is full of work?
The ThreadPoolExecutor in Python provides a thread pool that lets you run tasks concurrently. You can add tasks to the pool by calling submit() with your function name, which will return a Future object. You can call the cancel() function on the Future object to cancel the task before it has started running.
Once 'max' number of threads are reached, no more will be created, and new tasks will be queued until a thread is available to run them.
ThreadPool will create maximum of 10 threads to process 10 requests at a time. After process completion of any single Thread, ThreadPool will internally allocate the 11th request to this Thread and will keep on doing the same to all the remaining requests.
How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?
Instead of an open-ended queue, you can use a BlockingQueue
with a limit on it:
BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200);
In terms of jobs submitted to an ExecutorService
, instead of using the default ExecutorService
s created using Executors
, which use an unbounded queue, you can create your own:
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(200));
Once the queue fills up, it will cause it to reject any new tasks that are submitted. You will need to set a RejectedExecutionHandler
that submits to the queue. Something like:
final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS, queue);
// by default (unfortunately) the ThreadPoolExecutor will throw an exception
// when you submit the 201st job, to have it block you do:
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// this will block if the queue is full
executor.getQueue().put(r);
// check afterwards and throw if pool shutdown
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
}
});
I think it's a major miss that Java doesn't have a ThreadPoolExecutor.CallerBlocksPolicy
.
If you want the acknowledgment when the worker starts working on the task, you can make a custom ThreadFactory
that sends the acknowledgment from the thread before doing the actual work. OR you can override beforeExecute
of a ThreadPoolExecutor
.
If you want the acknowledgment when a new worker is freed up for a new task, I think you can initialize a ThreadPoolExecutor
with a SynchronousQueue
and a ThreadPoolExecutor.CallerRunsPolicy
, or with your own policy where the caller blocks.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With