Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread pool queue with unique tasks

I'm using ThreadPoolTaskExecutor (of spring) in order to execute some tasks asynchronously.

The required task will load some object from an external DB into my system memory. I'm using max thread pool size of 10 and max queue size of 100.

Suppose that all 10 threads are occupied getting objects from my DB and a task is created, it will go to the queue. Now another task is created that should get the same object (same key in DB) from the DB, it will also go to the queue (assuming all 10 threads are still occupied).

So my queue might get full easily with duplicated tasks which will get executed in turn and I don't want this to happen.

I thought that a solution should come in the form of a unique collection which serves as the thread pool queue. Under the hood ThreadPoolTaskExecutor uses LinkedBlockingQueue which does not provide uniqueness.

I thought of a few possible solutions but none satisfies me:

  • Using ThreadPoolExecutor instead of ThreadPoolTaskExecutor. The ThreadPoolExecutor provides a constructor which lets me determine the thread pool queue type, but it needs to implement the BlockingQueue interface. I could not find an implementation that preserve uniqueness.

This led me to try and extend LinkedBlockingQueue and override add:

public boolean add(E e)
    if(!this.contains(e)) {
        return super.add(e);
    } else {
        return false;
    }
}

But as far as I can tell this will lead to a major performance reduction since the contains method is limited by O(n) - bad idea.

What could solve my problem? I'm aiming for a good performance (in case of memory-performance trade offs I don't mind giving up memory for performance).

like image 339
forhas Avatar asked Mar 31 '15 12:03

forhas


1 Answers

Using Guava and ListenableFuture you could do something like that (haven't tested)

Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);

String t1 = "abc";
if(uniqueQueue.add(t1)) {
    ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
    Futures.addCallback(future, new FutureCallback<String>() {
        @Override
        public void onSuccess(String result) {
            uniqueQueue.remove(t1);
        }

        @Override
        public void onFailure(Throwable t) {
            uniqueQueue.remove(t1);
        }
    });
}

resulting in

  • only items that are not currently being processed or in the queue will be added to the queue (uniqueQueue)
  • items that have been processed will be removed from the uniqueQueue
  • you'll only have a maxium of 100 items in the queue

this implementation does not handle

  • Exceptions thrown by the submit() method
  • Maximum number of items in the unqiueQueue

With reference to your requirement of loading objects from a database into memory, you might want to take a look at Guava's Caches.

UPDATE:

  • BlockingQueue backed by LinkedHashSet from Apache Marmotta project
like image 95
marco.eig Avatar answered Oct 18 '22 21:10

marco.eig