I use the ExecutorService to run many tasks in different threads. Sometimes, too many Runnable instances waiting in the thread pool may cause the Out Of Memory problem.
I try to write a blocking job executor to solve it. Is there any official solution to do it ?
For example:
BlockingJobExecutor executor = new BlockingJobExecutor(3);
for (int i = 0; i < 1000; i++) {
executor.addJob(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
LogFactory.getLog(BTest.class).info("test " + System.currentTimeMillis());
}
});
}
executor.shutdown();
here is the BlockingJobExecutor class:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class BlockingJobExecutor {
AtomicInteger counter = new AtomicInteger();
ExecutorService service;
int threads;
public BlockingJobExecutor(int threads) {
if (threads < 1) {
throw new IllegalArgumentException("threads must be greater than 1.");
}
service = Executors.newFixedThreadPool(threads);
this.threads = threads;
}
static class JobWrapper implements Runnable {
BlockingJobExecutor executor;
Runnable job;
public JobWrapper(BlockingJobExecutor executor, Runnable job) throws InterruptedException {
synchronized (executor.counter) {
while (executor.counter.get() >= executor.limit()) {
executor.counter.wait();
}
}
this.executor = executor;
this.job = job;
}
@Override
public void run() {
try {
job.run();
} finally {
synchronized (executor.counter) {
executor.counter.decrementAndGet();
executor.counter.notifyAll();
}
}
}
}
public int limit() {
return threads;
}
public void shutdown() {
service.shutdown();
try {
service.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void addJob(Runnable job) {
try {
service.execute(new JobWrapper(this, job));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
There are two ways that this can happen. You can have too many runnables queued up waiting to run, or too many threads running at the same time. If there are too many jobs queued up, you can use a fixed size BlockingQueue
in the ExecutorService
to limit the number of items that can be queued up. Then when you try to queue a new task, the operation will block until there is room in the queue.
If there are too many threads running at once, you can limited how many threads are available to run tasks in the ExecutorService by calling Executors.newFixedThreadPool
with the number of threads you want.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With