Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Testing PriorityBlockingQueue in ThreadPoolExecutor

I realized my ThreadPoolExecutor with PriorityBlockingQueue like in this example: https://stackoverflow.com/a/12722648/2206775

and wrote a test:

PriorityExecutor executorService = (PriorityExecutor)  PriorityExecutor.newFixedThreadPool(16);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("1");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 1);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 3);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("2");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 2);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("5");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 5);

    executorService.submit(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000);
                Thread.sleep(1000);
                System.out.println("4");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }, 4);

    executorService.shutdown();
    try {
        executorService.awaitTermination(30, TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

But in the end, I don't get 1 2 3 4 5, I get a random order of those numbers. Is there a problem with the test, or something else? And if first, how can it be tested correctly?

like image 483
Dmitry Sobetsky Avatar asked May 30 '13 10:05

Dmitry Sobetsky


People also ask

What is queue capacity in ThreadPoolExecutor?

One of the added Advantage of using ThreadPoolTaskExecutor of Spring is that it is well suited for management and monitoring via JMX. The default configuration of core pool size is 1, max pool size and queue capacity as 2147483647.

How do you prevent ThreadPoolExecutor?

You can call the cancel() function on the Future object to cancel the task before it has started running. If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.

What is the use of ThreadPoolExecutor?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

What is role of a BlockingQueue in Executor framework?

BlockingQueue is an integral part of an Executor. BlockingQueue is used for storing the tasks when all the available threads are busy executing tasks and a new thread cannot be created.


1 Answers

The priority is only taken into account if the pool is fully busy and you submit several new tasks. If you define your pool with only one thread, you should get the expected output. In your example, all tasks get executed concurrently and which one finishes first is somewhat random.

By the way the linked implementation has a problem and throws an exception if your queue is full and you submit new tasks.

See below a working example of what you are trying to achieve (I have overriden newTaskFor in a simplistic way, just to make it work - you might want to improve that part).

It prints: 1 2 3 4 5.

public class Test {

    public static void main(String[] args) {
        PriorityExecutor executorService = (PriorityExecutor) PriorityExecutor.newFixedThreadPool(1);
        executorService.submit(getRunnable("1"), 1);
        executorService.submit(getRunnable("3"), 3);
        executorService.submit(getRunnable("2"), 2);
        executorService.submit(getRunnable("5"), 5);
        executorService.submit(getRunnable("4"), 4);

        executorService.shutdown();
        try {
            executorService.awaitTermination(30, TimeUnit.MINUTES);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static Runnable getRunnable(final String id) {
        return new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    System.out.println(id);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
    }

    static class PriorityExecutor extends ThreadPoolExecutor {

        public PriorityExecutor(int corePoolSize, int maximumPoolSize,
                                long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
        //Utitlity method to create thread pool easily

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new PriorityExecutor(nThreads, nThreads, 0L,
                                        TimeUnit.MILLISECONDS, new PriorityBlockingQueue<Runnable>());
        }
        //Submit with New comparable task

        public Future<?> submit(Runnable task, int priority) {
            return super.submit(new ComparableFutureTask(task, null, priority));
        }
        //execute with New comparable task

        public void execute(Runnable command, int priority) {
            super.execute(new ComparableFutureTask(command, null, priority));
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            return (RunnableFuture<T>) callable;
        }

        @Override
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
            return (RunnableFuture<T>) runnable;
        }
    }

    static class ComparableFutureTask<T> extends FutureTask<T> implements Comparable<ComparableFutureTask<T>> {

        volatile int priority = 0;

        public ComparableFutureTask(Runnable runnable, T result, int priority) {
            super(runnable, result);
            this.priority = priority;
        }

        public ComparableFutureTask(Callable<T> callable, int priority) {
            super(callable);
            this.priority = priority;
        }

        @Override
        public int compareTo(ComparableFutureTask<T> o) {
            return Integer.valueOf(priority).compareTo(o.priority);
        }
    }
}
like image 127
assylias Avatar answered Sep 21 '22 12:09

assylias