TL;DR: When several CompletableFuture
s are waiting to get executed, how can I prioritize those whose values i'm interested in?
I have a list of 10,000 CompletableFuture
s (which calculate the data rows for an internal report over the product database):
List<Product> products = ...;
List<CompletableFuture<DataRow>> dataRows = products
.stream()
.map(p -> CompletableFuture.supplyAsync(() -> calculateDataRowForProduct(p), singleThreadedExecutor))
.collect(Collectors.toList());
Each takes around 50ms to complete, so the entire thing finishes in 500sec. (they all share the same DB connection, so cannot run in parallel).
Let's say I want to access the data row of the 9000th product:
dataRows.get(9000).join()
The problem is, all these CompletableFutures are executed in the order they have been created, not in the order they are accessed. Which means I have to wait 450sec for it to calculate stuff that at the moment I don't care about, to finally get to the data row I want.
Question: Is there any way to change this behaviour, so that the Futures I try to access get priority over those I don't care about at the moment?
First thoughts:
I noticed that a ThreadPoolExecutor
uses a BlockingQueue<Runnable>
to queue up entries waiting for an available Thread.
So I thought about using a PriorityBlockingQueue
, to change the priority of the Runnable
when I access its CompletableFuture
but:
PriorityBlockingQueue
does not have a method to reprioritize an existing element, andCompletableFuture
to the corresponding Runnable
entry in the queue.Before I go further down this road, do you think this sounds like the correct approach. Do others ever had this kind of requirement? I tried to search for it, but found exactly nothing. Maybe CompletableFuture
is not the correct way of doing this?
Background: We have an internal report which displays 100 products per page. Initially we precalculated all DataRows for the report, which took way to long if someone has that many products.
So first optimization was to wrap the calculation in a memoized supplier:
List<Supplier<DataRow>> dataRows = products
.stream()
.map(p -> Suppliers.memoize(() -> calculateDataRowForProduct(p)))
.collect(Collectors.toList());
This means that initial display of first 100 entries now takes 5sec instead of 500sec (which is great), but when the user switches to the next pages, it takes another 5sec for each single one of them.
So the idea is, while the user is staring at the first screen, why not precalculate the next pages in the background. Which leads me to my question above.
To get some killer tips on how to prioritize your time. So, let’s get right into it so that you’re not wasting any more time then you have to. Here is how to prioritize your time with 25 tips for optimal time prioritization. 1. Set goals and stick to them. Goals are like a map.
Therefore, we need to join () on the combining CompletableFuture to wait for the result. 16. Composing CompletableFutures We can use composition using thenCompose () to accomplish the same computation done in the previous two examples. This method waits for the first stage (which applies an uppercase conversion) to complete.
Keep in mind that CompletableFutures are not bound to threads executing these jobs, so if even though CompletableFuture timeouts, the underlying task is still running in the background. The complete example can be found on GitHub. We were unable to load Disqus Recommendations.
This is your primary focus for the day. Determine three medium priorities. Ideally, these should be subtasks related to your main priority. And, schedule no more than five small must-to-do- priorities, such as meetings. While these are important and deserve your time, we call these smaller to-dos since they don’t require as much energy. 8.
You could avoid submitting all of the tasks to the executor at the start, instead only submit one background task and when it finishes submit the next. If you want to get the 9000th row submit it immediately (if it has not already been submitted):
static class FutureDataRow {
CompletableFuture<DataRow> future;
int index;
List<FutureDataRow> list;
Product product;
FutureDataRow(List<FutureDataRow> list, Product product){
this.list = list;
index = list.size();
list.add(this);
this.product = product;
}
public DataRow get(){
submit();
return future.join();
}
private synchronized void submit(){
if(future == null) future = CompletableFuture.supplyAsync(() ->
calculateDataRowForProduct(product), singleThreadedExecutor);
}
private void background(){
submit();
if(index >= list.size() - 1) return;
future.whenComplete((dr, t) -> list.get(index + 1).background());
}
}
...
List<FutureDataRow> dataRows = new ArrayList<>();
products.forEach(p -> new FutureDataRow(dataRows, p));
dataRows.get(0).background();
If you want you could also submit the next row inside the get method if you expect that they will navigate to the next page afterwards.
If you were instead using a multithreaded executor and you wanted to run multiple background tasks concurrently you could modify the background method to find the next unsubmitted task in the list and start it when the current background task has finished.
private synchronized boolean background(){
if(future != null) return false;
submit();
future.whenComplete((dr, t) -> {
for(int i = index + 1; i < list.size(); i++){
if(list.get(i).background()) return;
}
});
return true;
}
You would also need to start the first n tasks in the background instead of just the first one.
int n = 8; //number of active background tasks
for(int i = 0; i < dataRows.size() && n > 0; i++){
if(dataRows.get(i).background()) n--;
}
Interesting problem :)
One way is to roll out custom FutureTask
class to facilitate changing priorities of tasks dynamically.
DataRow
and Product
are both taken as just String
here for simplicity.
import java.util.*;
import java.util.concurrent.*;
public class Testing {
private static String calculateDataRowForProduct(String product) {
try {
// Dummy operation.
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Computation done for " + product);
return "data row for " + product;
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
PriorityBlockingQueue<Runnable> customQueue = new PriorityBlockingQueue<Runnable>(1, new CustomRunnableComparator());
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, customQueue);
List<String> products = new ArrayList<>();
for (int i = 0; i < 10; i++) {
products.add("product" + i);
}
Map<Integer, PrioritizedFutureTask<String>> taskIndexMap = new HashMap<>();
for (int i = 0; i < products.size(); i++) {
String product = products.get(i);
Callable callable = () -> calculateDataRowForProduct(product);
PrioritizedFutureTask<String> dataRowFutureTask = new PrioritizedFutureTask<>(callable, i);
taskIndexMap.put(i, dataRowFutureTask);
executor.execute(dataRowFutureTask);
}
List<Integer> accessOrder = new ArrayList<>();
accessOrder.add(4);
accessOrder.add(7);
accessOrder.add(2);
accessOrder.add(9);
int priority = -1 * accessOrder.size();
for (Integer nextIndex : accessOrder) {
PrioritizedFutureTask taskAtIndex = taskIndexMap.get(nextIndex);
assert (customQueue.remove(taskAtIndex));
customQueue.offer(taskAtIndex.set_priority(priority++));
// Now this task will be at the front of the thread pool queue.
// Hence this task will execute next.
}
for (Integer nextIndex : accessOrder) {
PrioritizedFutureTask<String> dataRowFutureTask = taskIndexMap.get(nextIndex);
String dataRow = dataRowFutureTask.get();
System.out.println("Data row for index " + nextIndex + " = " + dataRow);
}
}
}
class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask<T>> {
private Integer _priority = 0;
private Callable<T> callable;
public PrioritizedFutureTask(Callable<T> callable, Integer priority) {
super(callable);
this.callable = callable;
_priority = priority;
}
public Integer get_priority() {
return _priority;
}
public PrioritizedFutureTask set_priority(Integer priority) {
_priority = priority;
return this;
}
@Override
public int compareTo(@NotNull PrioritizedFutureTask<T> other) {
if (other == null) {
throw new NullPointerException();
}
return get_priority().compareTo(other.get_priority());
}
}
class CustomRunnableComparator implements Comparator<Runnable> {
@Override
public int compare(Runnable task1, Runnable task2) {
return ((PrioritizedFutureTask)task1).compareTo((PrioritizedFutureTask)task2);
}
}
Output:
Computation done for product0
Computation done for product4
Data row for index 4 = data row for product4
Computation done for product7
Data row for index 7 = data row for product7
Computation done for product2
Data row for index 2 = data row for product2
Computation done for product9
Data row for index 9 = data row for product9
Computation done for product1
Computation done for product3
Computation done for product5
Computation done for product6
Computation done for product8
There is one more scope of optimization here.
The customQueue.remove(taskAtIndex)
operation has O(n)
time complexity with respect to the size of the queue (or the total number of products).
It might not affect much if the number of products is less (<= 10^5).
But it might result in a performance issue otherwise.
One solution to that is to extend BlockingPriorityQueue
and roll out functionality to remove an element from a priority queue in O(logn)
rather than O(n).
We can achieve that by keeping a hashmap inside the PriorityQueue structure. This hashmap will keep a count of elements vs the index (or indices in case of duplicates) of that element in the underlying array.
Fortunately, I had already implemented such a heap in Python sometime back.
If you have more questions on this optimization, its probably better to ask a new question altogether.
To answer my own question...
There is a surprisingly simple (and surprisingly boring) solution to my problem. I have no idea why it took me three days to find it, I guess it required the right mindset, that you only have when walking along an endless tranquilizing beach looking into the sunset on a quiet Sunday evening.
So, ah, it's a little bit embarrassing to write this, but when I need to fetch a certain value (say for 9000th product), and the future has not yet computed that value, I can, instead of somehow forcing the future to produce that value asap (by doing all this repriorisation and scheduling magic), I can, well, I can, ... simply ... compute that value myself! Yes! Wait, what? Seriously, that's it?
It's something like this: if (!future.isDone()) {future.complete(supplier.get());}
I just need to store the original Supplier
alongside the CompletableFuture
in some wrapper class. This is the wrapper class, which works like a charm, all it needs is a better name:
public static class FuturizedMemoizedSupplier<T> implements Supplier<T> {
private CompletableFuture<T> future;
private Supplier<T> supplier;
public FuturizedSupplier(Supplier<T> supplier) {
this.supplier = supplier;
this.future = CompletableFuture.supplyAsync(supplier, singleThreadExecutor);
}
public T get() {
// if the future is not yet completed, we just calculate the value ourselves, and set it into the future
if (!future.isDone()) {
future.complete(supplier.get());
}
supplier = null;
return future.join();
}
}
Now, I think, there is a small chance for a race condition here, which could lead to the supplier
being executed twice. But actually, I don't care, it produces the same value anyway.
Afterthoughts:
I have no idea why I didn't think of this earlier, I was completely fixated on the idea, it has to be the CompletableFuture
which calculates the value, and it has to run in one of these background threads, and whatnot, and, well, none of these mattered or were in any way a requirement.
I think this whole question is a classic example of Ask what problem you really want to solve instead of coming up with a half baked broken solution, and ask how to fix that. In the end, I didn't care about CompletableFuture
or any of its features at all, it was just the easiest way that came to my mind to run something in the background.
Thanks for your help!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With