Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ThreadPoolExecutor with ArrayBlockingQueue

I started reading more about ThreadPoolExecutor from Java Doc as I am using it in one of my project. So Can anyone explain me what does this line means actually?- I know what does each parameter stands for, but I wanted to understand it in more general/lay-man way from some of the experts here.

ExecutorService service = new ThreadPoolExecutor(10, 10, 1000L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10, true), new 
ThreadPoolExecutor.CallerRunsPolicy());

Updated:- Problem Statement is:-

Each thread uses unique ID between 1 and 1000 and program has to run for 60 minutes or more, So in that 60 minutes it is possible that all the ID's will get finished so I need to reuse those ID's again. So this is the below program I wrote by using above executor.

class IdPool {
    private final LinkedList<Integer> availableExistingIds = new LinkedList<Integer>();

    public IdPool() {
        for (int i = 1; i <= 1000; i++) {
            availableExistingIds.add(i);
        }
    }

    public synchronized Integer getExistingId() {
        return availableExistingIds.removeFirst();
    }

    public synchronized void releaseExistingId(Integer id) {
        availableExistingIds.add(id);
    }
}


class ThreadNewTask implements Runnable {
    private IdPool idPool;

    public ThreadNewTask(IdPool idPool) {
        this.idPool = idPool;
    }

    public void run() {
        Integer id = idPool.getExistingId();
        someMethod(id);
        idPool.releaseExistingId(id);
    }

// This method needs to be synchronized or not?
    private synchronized void someMethod(Integer id) {
        System.out.println("Task: " +id);
// and do other calcuations whatever you need to do in your program
    }
}

public class TestingPool {
    public static void main(String[] args) throws InterruptedException {
        int size = 10;
        int durationOfRun = 60;
        IdPool idPool = new IdPool();   
        // create thread pool with given size
        ExecutorService service = new ThreadPoolExecutor(size, size, 500L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(size), new ThreadPoolExecutor.CallerRunsPolicy()); 

        // queue some tasks
        long startTime = System.currentTimeMillis();
        long endTime = startTime + (durationOfRun * 60 * 1000L);

        // Running it for 60 minutes
        while(System.currentTimeMillis() <= endTime) {
            service.submit(new ThreadNewTask(idPool));
        }

        // wait for termination        
        service.shutdown();
        service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 
    }
}

My Questions is:- This code is right as far as the Performance is considered or not? And what else I can make it here to make it more accurate? Any help will be appreciated.

like image 602
arsenal Avatar asked May 26 '12 22:05

arsenal


People also ask

What is the difference between ArrayBlockingQueue and LinkedBlockingQueue?

LinkedBlockingQueue has a putLock and a takeLock for insertion and removal respectively but ArrayBlockingQueue uses only 1 lock. ArrayBlockingQueue uses single-lock double condition algorithm and LinkedBlockingQueue is variant of the "two lock queue" algorithm and it has 2 locks 2 conditions ( takeLock , putLock).

What is the difference between ThreadPoolExecutor and ExecutorService?

On the other hand, ExecutorService is an extension of the Executor interface and provides a facility for returning a Future object and terminate, or shut down the thread pool. Once the shutdown is called, the thread pool will not accept new tasks but complete any pending task.

What is a ThreadPoolExecutor and why is it necessary?

ThreadPoolExecutor is an ExecutorService to execute each submitted task using one of possibly several pooled threads, normally configured using Executors factory methods. It also provides various utility methods to check current threads statistics and control them.

Is Threadsafe an ArrayBlockingQueue?

BlockingQueue implementations are thread-safe. All queuing methods achieve their effects atomically using internal locks or other forms of concurrency control.


2 Answers

[First, i apologize, this is a response to a previous answer, but i wanted formatting].

Except in reality, you DON'T block when an item is submitted to a ThreadPoolExecutor with a full queue. The reason for this is that ThreadPoolExecutor calls the BlockingQueue.offer(T item) method which by definition is a non-blocking method. It either adds the item and returns true, or does not add (when full) and returns false. The ThreadPoolExecutor then calls the registered RejectedExecutionHandler to deal with this situation.

From the javadoc:

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

By default, the ThreadPoolExecutor.AbortPolicy() is used which throws a RejectedExecutionException from the "submit" or "execute" method of the ThreadPoolExecutor.

try {
   executorService.execute(new Runnable() { ... });
}
catch (RejectedExecutionException e) {
   // the queue is full, and you're using the AbortPolicy as the 
   // RejectedExecutionHandler
}

However, you can use other handlers to do something different, such as ignore the error (DiscardPolicy), or run it in the thread which called the "execute" or "submit" method (CallerRunsPolicy). This example lets whichever thread calls the "submit" or "execute" method run the requested task when the queue is full. (this means at any given time, you could 1 additional thing running on top of what's in the pool itself):

ExecutorService service = new ThreadPoolExecutor(..., new ThreadPoolExecutor.CallerRunsPolicy());

If you want to block and wait, you could implement your own RejectedExecutionHandler which would block until there's a slot available on the queue (this is a rough estimate, i have not compiled or run this, but you should get the idea):

public class BlockUntilAvailableSlot implements RejectedExecutionHandler {
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
     if (e.isTerminated() || e.isShutdown()) {
        return;
     }

     boolean submitted = false;
     while (! submitted) {
       if (Thread.currentThread().isInterrupted()) {
            // be a good citizen and do something nice if we were interrupted
            // anywhere other than during the sleep method.
       }

       try {
          e.execute(r);
          submitted = true;
       }
       catch (RejectedExceptionException e) {
         try {
           // Sleep for a little bit, and try again.
           Thread.sleep(100L);
         }
         catch (InterruptedException e) {
           ; // do you care if someone called Thread.interrupt?
           // if so, do something nice here, and maybe just silently return.
         }
       }
     }
  }
}
like image 52
Matt Avatar answered Nov 02 '22 23:11

Matt


It's creating an ExecutorService which handles the execution of a pool of threads. Both the initial and maximum number of threads in the pool is 10 in this case. When a thread in the pool becomes idle for 1 second (1000ms) it will kill it (the idle timer), however because the max and core number of threads is the same, this will never happen (it always keeps 10 threads around and will never run more than 10 threads).

It uses an ArrayBlockingQueue to manage the execution requests with 10 slots, so when the queue is full (after 10 threads have been enqueued), it will block the caller.

If thread is rejected (which in this case would be due to the service shutting down, since threads will be queued or you will be blocked when queuing a thread if the queue is full), then the offered Runnable will be executed on the caller's thread.

like image 33
Francis Upton IV Avatar answered Nov 02 '22 23:11

Francis Upton IV