Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spring @Scheduled and @Async together

Here is my use case.

A legacy system updates a database queue table QUEUE.

I want a scheduled recurring job that - checks the contents of QUEUE - if there are rows in the table it locks the row and does some work - deletes the row in QUEUE

If the previous job is still running, then a new thread will be created to do the work. I want to configure the maximum number of concurrent threads.

I am using Spring 3 and my current solution is to do the following (using a fixedRate of 1 millisecond to get the threads to run basically continuously)

@Scheduled(fixedRate = 1)
@Async
public void doSchedule() throws InterruptedException {
    log.debug("Start schedule");
    publishWorker.start();
    log.debug("End schedule");
}

<task:executor id="workerExecutor" pool-size="4" />

This created 4 threads straight off and the threads correctly shared the workload from the queue. However I seem to be getting a memory leak when the threads take a long time to complete.

java.util.concurrent.ThreadPoolExecutor @ 0xe097b8f0                              |              80 |   373,410,496 |     89.74%
|- java.util.concurrent.LinkedBlockingQueue @ 0xe097b940                          |           48 |   373,410,136 |     89.74%
|  |- java.util.concurrent.LinkedBlockingQueue$Node @ 0xe25c9d68  

So

1: Should I be using @Async and @Scheduled together?

2: If not then how else can I use spring to achieve my requirements?

3: How can I create the new threads only when the other threads are busy?

Thanks all!

EDIT: I think the queue of jobs was getting infinitely long... Now using

    <task:executor id="workerExecutor"
    pool-size="1-4"
    queue-capacity="10" rejection-policy="DISCARD" />

Will report back with results

like image 899
user1980833 Avatar asked Jan 31 '13 10:01

user1980833


1 Answers

You can try

  1. Run a scheduler with one second delay, which will lock & fetch all QUEUE records that weren't locked so far.
  2. For each record, call an Async method, which will process that record & delete it.
  3. The executor's rejection policy should be ABORT, so that the scheduler can unlock the QUEUEs that aren't given out for processing yet. That way the scheduler can try processing those QUEUEs again in the next run.

Of course, you'll have to handle the scenario, where the scheduler has locked a QUEUE, but the handler didn't finish processing it for whatever reason.

Pseudo code:

public class QueueScheduler {
    @AutoWired
    private QueueHandler queueHandler;

    @Scheduled(fixedDelay = 1000)
    public void doSchedule() throws InterruptedException {
        log.debug("Start schedule");
        List<Long> queueIds = lockAndFetchAllUnlockedQueues();
        for (long id : queueIds)
            queueHandler.process(id);
        log.debug("End schedule");
    }
}

public class QueueHandler {

    @Async
    public void process(long queueId) {
        // process the QUEUE & delete it from DB
    }
}
<task:executor id="workerExecutor" pool-size="1-4" queue-capcity="10"
     rejection-policy="ABORT"/>
like image 90
Karthikeyan Marudhachalam Avatar answered Sep 21 '22 09:09

Karthikeyan Marudhachalam