Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you limit threads in the ExecutorService?

I use the ExecutorService to run many tasks in different threads. Sometimes, too many Runnable instances waiting in the thread pool may cause the Out Of Memory problem.

I try to write a blocking job executor to solve it. Is there any official solution to do it ?

For example:

    BlockingJobExecutor executor = new BlockingJobExecutor(3);
    for (int i = 0; i < 1000; i++) {
        executor.addJob(new Runnable() {

            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
            }
        });
    }
    executor.shutdown();

here is the BlockingJobExecutor class:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class BlockingJobExecutor {

    AtomicInteger counter = new AtomicInteger();
    ExecutorService service;
    int threads;

    public BlockingJobExecutor(int threads) {
        if (threads < 1) {
            throw new IllegalArgumentException("threads must be greater than 1.");
        }
        service = Executors.newFixedThreadPool(threads);
        this.threads = threads;
    }

    static class JobWrapper implements Runnable {
        BlockingJobExecutor executor;
        Runnable job;

        public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
            synchronized (executor.counter) {
                while (executor.counter.get() >= executor.limit()) {
                    executor.counter.wait();
                }
            }
            this.executor = executor;
            this.job = job;
        }

        @Override
        public void run() {
            try {
                job.run();
            } finally {
                synchronized (executor.counter) {
                    executor.counter.decrementAndGet();
                    executor.counter.notifyAll();
                }
            }
        }
    }

    public int limit() {
        return threads;
    }

    public void shutdown() {
        service.shutdown();
        try {
            service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void addJob(Runnable job) {
        try {
            service.execute(new JobWrapper(this, job));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

}
like image 573
qrtt1 Avatar asked Aug 08 '13 06:08

qrtt1


1 Answers

There are two ways that this can happen. You can have too many runnables queued up waiting to run, or too many threads running at the same time. If there are too many jobs queued up, you can use a fixed size BlockingQueue in the ExecutorService to limit the number of items that can be queued up. Then when you try to queue a new task, the operation will block until there is room in the queue.

If there are too many threads running at once, you can limited how many threads are available to run tasks in the ExecutorService by calling Executors.newFixedThreadPool with the number of threads you want.

like image 109
Jonathon Thorndycraft Avatar answered Sep 20 '22 23:09

Jonathon Thorndycraft