Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dynamic priorities for a heterogeneous task set

I've got a bunch of repeating tasks to schedule. They query the database to find out what to do and then execute some action like statistics updates, sending emails, fetching files and importing them. Currently, there are maybe ten of them and this number it's expected to grow a lot. I'm not given any timing constraints, actually, it's my job to choose an algorithm so that nobody complains. :D

Currently, I'm using an ad-hoc combination of threads and periodically scheduled tasks like

  • for the most important task, there's an own thread falling back to a short sleep when idle (from which it can be woken up, when new important work arrives).
  • another important task is scheduled once per hour in its own thread
  • medium importance tasks are scheduled periodically to "fill the holes", so that probably only one of them runs at any moment
  • the least important tasks are all processed by a single dedicated thread

It seems to work well at the moment, but it's not future-proof and it doesn't feel right for these reasons:

  • As the queue for the least important tasks may grow a lot, such tasks may be delayed indefinitely.
  • Filling the holes may go wrong and there may be many tasks running at once.
  • The number of tasks running at any given moment should depend on the server load. (*)

(*) It's primarily a web server and serving requests is actually the highest priority. Getting a separate server wouldn't help, as the bottleneck is usually the database. Currently, it works fine, but I'm looking for a better solution as we hope that the load grows by a factor of 100 in a year or two.

My idea is to increase the priority of a job, when it was delayed too much. For example, there are statistics running hourly and delaying them by a few hours is no big deal, but it shouldn't be a whole day and it mustn't be a whole week.

I'd be happy to replace all my AbstractExecutionThreadServices and AbstractScheduledServices by something working like follows:

  • Start the highest priority tasks immediately, no matter what.
  • Start the medium priority tasks only when the total load is "small".
  • Start the lowest priority tasks only when the system is "mostly idle".
  • Increase the priorities for delayed tasks using a supplied formula.

This surely sounds pretty fuzzy and getting it more precise is a part of what I'm asking. My competing goals are

  • Never delay the important tasks needlessly.
  • Never let too many concurrently running tasks slow down the server too much.

There are no hard deadlines and there's no need to minimize the number of threads used. I don't insist on a solution doing exactly what I described, I'm not looking for a library (nor I insist on reinventing the wheel). I don't think that a cron-like scheduler is the right solution.

like image 390
maaartinus Avatar asked Nov 08 '22 08:11

maaartinus


1 Answers

Working with the ExecutorService model, the classic solution to reordering executor tasks is to create a ThreadPoolExecutor with a PriorityBlockingQueue feeding it the tasks - as described here.

However needing to schedule the tasks as well puts a spin on it. ScheduledThreadPoolExecutor uses an internal custom BlockingQueue to feed the tasks in when the schedule is ready, but as I think you're well aware, it's not easily open to further customisation.

At a glance, DelayQueue looks like it fits the bill perfectly - it can prioritise the next Delayed element or task. And this handles a late decision by Delayed.getDelay() about whether it is ready to go.

The fly in the ointment with this plan is when you try to pass something like DelayQueue<DelayedRunnable> into the constructor of ThreadPoolExecutor. This will only accept a BlockingQueue<Runnable>, not BlockingQueue<? extends Runnable>.

One way out of this is to create a minimum implementation of BlockingQueue<Runnable> that delegates to a BlockingQueue. The basics are here:

public class BlockingDelayQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> {

    private final DelayQueue<DelayedRunnable> delayQueue;

    public BlockingDelayQueue(DelayQueue<DelayedRunnable> delayQueue) {
        this.delayQueue = delayQueue;
    }

    @Override
    public boolean isEmpty() {
        return delayQueue.isEmpty();
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        DelayedRunnable delayedRunnable = delayQueue.poll(timeout, unit);

        if (delayedRunnable == null)
            return null;
        return delayedRunnable.getCommand();
    }
    ...
}

The experimental version of DelayedRunnable used to prove the idea there uses a simple Priority enum that checks the 'busyness' of the executor:

LOW {
    boolean isReady(ThreadPoolExecutor executor) {
        return executor.getActiveCount() == 0;
    }
},
MEDIUM {
    boolean isReady(ThreadPoolExecutor executor) {
        return executor.getActiveCount() <= 1;
    }
},
HIGH {
    boolean isReady(ThreadPoolExecutor executor) {
        return true;
    }
};

Which DelayedRunnable.getDelay() can then check:

@Override
public long getDelay(TimeUnit unit) {
    long millis;
    if (!priority.isReady(executor))
        millis = 1000;
    else
        millis = time - System.currentTimeMillis();
    return unit.convert(millis, TimeUnit.MILLISECONDS);
}

- so long as it doesn't return <= 0 if the priority isn't ready yet.

This seemed to work well, e.g. launching a standard 2s sleep task here...

    DelayedScheduler scheduler = new DelayedScheduler();
    scheduler.schedule(task("Low 1"), 1, TimeUnit.SECONDS, Priority.LOW);
    scheduler.schedule(task("Low 2"), 2, TimeUnit.SECONDS, Priority.LOW);
    scheduler.schedule(task("Low 3"), 3, TimeUnit.SECONDS, Priority.LOW);
    scheduler.schedule(task("Medium 1"), 1, TimeUnit.SECONDS, Priority.MEDIUM);
    scheduler.schedule(task("Medium 2"), 2, TimeUnit.SECONDS, Priority.MEDIUM);
    scheduler.schedule(task("Medium 3"), 3, TimeUnit.SECONDS, Priority.MEDIUM);
    scheduler.schedule(task("High 1"), 1, TimeUnit.SECONDS, Priority.HIGH);
    scheduler.schedule(task("High 2"), 2, TimeUnit.SECONDS, Priority.HIGH);
    scheduler.schedule(task("High 3"), 3, TimeUnit.SECONDS, Priority.HIGH);

... produced about the right results:

High 1 started at 1087ms
Medium 1 started at 1087ms
High 2 started at 2087ms
  Medium 1 ended at 3087ms
  High 1 ended at 3087ms
High 3 started at 3087ms
  High 2 ended at 4088ms
Medium 2 started at 4088ms
  High 3 ended at 5088ms
Medium 3 started at 5088ms
  Medium 2 ended at 6088ms
  Medium 3 ended at 7089ms
Low 1 started at 7089ms
  Low 1 ended at 9089ms
Low 2 started at 9089ms
  Low 2 ended at 11089ms
Low 3 started at 11089ms
  Low 3 ended at 13089ms

- Medium priority tasks were allowed while there was only one High priority task running, Low while there was nothing else going.

(DelayedScheduler and the other unseen bits on GitHub).

like image 153
df778899 Avatar answered Nov 14 '22 23:11

df778899