Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to manage M threads (1 per task) ensuring only N threads at the same time. With N < M. In Java

I have a queue of task in java. This queue is in a table in the DB.

I need to:

  • 1 thread per task only
  • No more than N threads running at the same time. This is because the threads have DB interaction and I don't want have a bunch of DB connections opened.

I think I could do something like:

final Semaphore semaphore = new Semaphore(N);
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        final CountDownLatch cdl = new CountDownLatch(tasks.size());
        for (final JobTask task : tasks) {
            Thread tr = new Thread(new Runnable() {

                @Override
                public void run() {
                    semaphore.acquire();
                    task.doWork();
                    semaphore.release();
                    cdl.countDown();
                }

            });
        }
        cdl.await();
    }
}

I know that an ExecutorService class exists, but I'm not sure if it I can use it for this.

So, do you think that this is the best way to do this? Or could you clarify me how the ExecutorService works in order to solve this?

final solution:

I think the best solution is something like:

while (isOnJob) {
    ExecutorService executor = Executors.newFixedThreadPool(N);
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        for (final JobTask task : tasks) {
            executor.submit(new Runnable() {

                @Override
                public void run() {
                    task.doWork();
                }

            });
        }
    }
    executor.shutdown();
    executor.awaitTermination(Long.MAX_VALUE, TimeUnit.HOURS);
}

Thanks a lot for the awnsers. BTW I am using a connection pool, but the queries to the DB are very heavy and I don't want to have uncontrolled number of task at the same time.

like image 440
user2427 Avatar asked Sep 12 '09 18:09

user2427


2 Answers

You can indeed use an ExecutorService. For instance, create a new fixed thread pool using the newFixedThreadPool method. This way, besides caching threads, you also guarantee that no more than n threads are running at the same time.

Something along these lines:

private static final ExecutorService executor = Executors.newFixedThreadPool(N);
// ...
while (isOnJob) {
    List<JobTask> tasks = getJobTasks();
    if (!tasks.isEmpty()) {
        List<Future<?>> futures = new ArrayList<Future<?>>();
        for (final JobTask task : tasks) {
                Future<?> future = executor.submit(new Runnable() {    
                        @Override
                        public void run() {
                                task.doWork();
                        }
                });
                futures.add(future);
        }
        // you no longer need to use await
        for (Future<?> fut : futures) {
          fut.get();
        }
    }
}

Note that you no longer need to use the latch, as get will wait for the computation to complete, if necessary.

like image 95
João Silva Avatar answered Nov 15 '22 03:11

João Silva


I agree with JG that ExecutorService is the way to go... but I think you're both making it more complicated than it needs to be.

Rather than creating a large number of threads (1 per task) why not just create a fixed sized thread pool (with Executors.newFixedThreadPool(N)) and submit all the tasks to it? No need for a semaphore or anything like that - just submit the jobs to the thread pool as you get them, and the thread pool will handle them with up to N threads at a time.

If you aren't going to use more than N threads at a time, why would you want to create them?

like image 39
Jon Skeet Avatar answered Nov 15 '22 03:11

Jon Skeet