I'm using ThreadPoolTaskExecutor (of spring) in order to execute some tasks asynchronously.
The required task will load some object from an external DB into my system memory. I'm using max thread pool size of 10 and max queue size of 100.
Suppose that all 10 threads are occupied getting objects from my DB and a task is created, it will go to the queue. Now another task is created that should get the same object (same key in DB) from the DB, it will also go to the queue (assuming all 10 threads are still occupied).
So my queue might get full easily with duplicated tasks which will get executed in turn and I don't want this to happen.
I thought that a solution should come in the form of a unique collection which serves as the thread pool queue. Under the hood ThreadPoolTaskExecutor uses LinkedBlockingQueue which does not provide uniqueness.
I thought of a few possible solutions but none satisfies me:
This led me to try and extend LinkedBlockingQueue and override add:
public boolean add(E e)
if(!this.contains(e)) {
return super.add(e);
} else {
return false;
}
}
But as far as I can tell this will lead to a major performance reduction since the contains
method is limited by O(n) - bad idea.
What could solve my problem? I'm aiming for a good performance (in case of memory-performance trade offs I don't mind giving up memory for performance).
Using Guava and ListenableFuture you could do something like that (haven't tested)
Set<String> uniqueQueue = Sets.newConcurrentHashSet();
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, Queues.newLinkedBlockingQueue(100));
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
String t1 = "abc";
if(uniqueQueue.add(t1)) {
ListenableFuture<String> future = executorService.submit(() -> "do something with " + t1);
Futures.addCallback(future, new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
uniqueQueue.remove(t1);
}
@Override
public void onFailure(Throwable t) {
uniqueQueue.remove(t1);
}
});
}
resulting in
uniqueQueue
)uniqueQueue
this implementation does not handle
Exceptions
thrown by the submit()
methodunqiueQueue
With reference to your requirement of loading objects from a database into memory, you might want to take a look at Guava's Caches.
UPDATE:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With