Here is my use case.
A legacy system updates a database queue table QUEUE.
I want a scheduled recurring job that - checks the contents of QUEUE - if there are rows in the table it locks the row and does some work - deletes the row in QUEUE
If the previous job is still running, then a new thread will be created to do the work. I want to configure the maximum number of concurrent threads.
I am using Spring 3 and my current solution is to do the following (using a fixedRate of 1 millisecond to get the threads to run basically continuously)
@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
publishWorker.start();
log.debug("End schedule");
}
<task:executor id="workerExecutor" pool-size="4" />
This created 4 threads straight off and the threads correctly shared the workload from the queue. However I seem to be getting a memory leak when the threads take a long time to complete.
java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0 | 80 | 373,410,496 | 89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940 | 48 | 373,410,136 | 89.74%
| |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68
So
1: Should I be using @Async and @Scheduled together?
2: If not then how else can I use spring to achieve my requirements?
3: How can I create the new threads only when the other threads are busy?
Thanks all!
EDIT: I think the queue of jobs was getting infinitely long... Now using
<task:executor id="workerExecutor"
pool-size="1-4"
queue-capacity="10" rejection-policy="DISCARD" />
Will report back with results
You can try
Of course, you'll have to handle the scenario, where the scheduler has locked a QUEUE, but the handler didn't finish processing it for whatever reason.
Pseudo code:
public class QueueScheduler {
@AutoWired
private QueueHandler queueHandler;
@Scheduled(fixedDelay = 1000)
public void doSchedule() throws InterruptedException {
log.debug("Start schedule");
List<Long> queueIds = lockAndFetchAllUnlockedQueues();
for (long id : queueIds)
queueHandler.process(id);
log.debug("End schedule");
}
}
public class QueueHandler {
@Async
public void process(long queueId) {
// process the QUEUE & delete it from DB
}
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
rejection-policy="ABORT"/>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With