Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I use CompletableFuture.supplyAsync together with PriorityBlockingQueue?

I'm trying to add a priority queue to an existing application that uses ThreadPoolExecutor with a LinkedBlockingQueue via CompletableFuture.supplyAsync. The problem is that I can't come up with a design to assign task priorities that I can then access in PriorityBlockingQueue's Comparator. That is because my task gets wrapped up by CompletableFuture into an instance of a private inner class called AsyncSupply that hides the original task in a private field. The Comparator then gets called with these AsyncSupply objects casted as Runnables, as follows:

public class PriorityComparator<T extends Runnable> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {

        // T is an AsyncSupply object.
        // BUT I WANT SOMETHING I CAN ASSIGN PRIORITIES TOO!
        return 0;
    }
}

I investigated the possibility of extending CompletableFuture so I can wrap it in a different object but so much of much of CompletableFuture is encapsulated and uninheritable. So extending it doesn't seem like an option. Nor is encapsulating it withing an adapter, as it implements a very wide interface.

I'm not sure how to approach this problem aside from copying the entire CompletableFuture, and modifying it. Any ideas?

like image 719
jacob Avatar asked Jan 19 '16 00:01

jacob


People also ask

What is difference between supplyAsync and runAsync?

The difference between runAsync() and supplyAsync() is that the former returns a Void while supplyAsync() returns a value obtained by the Supplier. Both methods also support a second input argument — a custom Executor to submit tasks to.

What is CompletableFuture supplyAsync?

supplyAsync() is a static method of CompletableFuture introduced in Java 8. The method supplyAsync() completes a task asynchronously running in either ForkJoinPool. commonPool() or given Executor .

Is Completable Future get blocking?

It just provides a get() method which blocks until the result is available to the main thread. Ultimately, it restricts users from applying any further action on the result. You can create an asynchronous workflow with CompletableFuture. It allows chaining multiple APIs, sending ones to result to another.

When should I use CompletableFuture?

What is CompletableFuture? A CompltableFuture is used for asynchronous programming. Asynchronous programming means writing non-blocking code. It runs a task on a separate thread than the main application thread and notifies the main thread about its progress, completion or failure.


1 Answers

It seems like a limitation in the API that CompletableFuture doesn't provide a straightforward way to use PriorityBlockingQueue. Fortunately, we can hack around it without too much trouble. In Oracle's 1.8 JVM, they happen to name all of the inner classes' fields fn, so extracting our priority-aware Runnables can be done without too much trouble:

public class CFRunnableComparator implements Comparator<Runnable> {

    @Override
    @SuppressWarnings("unchecked")
    public int compare(Runnable r1, Runnable r2) {
        // T might be AsyncSupply, UniApply, etc., but we want to
        // compare our original Runnables.
        return ((Comparable) unwrap(r1)).compareTo(unwrap(r2));
    }

    private Object unwrap(Runnable r) {
        try {
            Field field = r.getClass().getDeclaredField("fn");
            field.setAccessible(true);
            // NB: For performance-intensive contexts, you may want to
            // cache these in a ConcurrentHashMap<Class<?>, Field>.
            return field.get(r);
        } catch (IllegalAccessException | NoSuchFieldException e) {
            throw new IllegalArgumentException("Couldn't unwrap " + r, e);
        }
    }
}

This assumes that your Supplier class is Comparable, something like:

public interface WithPriority extends Comparable<WithPriority> {
    int priority();
    @Override
    default int compareTo(WithPriority o) {
        // Reverse comparison so higher priority comes first.
        return Integer.compare(o.priority(), priority());
    }
}

public class PrioritySupplier<T> implements Supplier<T>, WithPriority {
    private final int priority;
    private final Supplier<T> supplier;
    public PrioritySupplier(int priority, Supplier<T> supplier) {
        this.priority = priority;
        this.supplier = supplier;
    }
    @Override
    public T get() {
        return supplier.get();
    }
    @Override
    public int priority() {
        return priority;
    }
}

Used as follows:

PriorityBlockingQueue<Runnable> q = new PriorityBlockingQueue<>(11 /*default*/,
        new CFRunnableComparator());
ThreadPoolExecutor pool = new ThreadPoolExecutor(..., q);
CompletableFuture.supplyAsync(new PrioritySupplier<>(n, () -> {
    ...
}), pool);

If you create classes like PriorityFunction and PriorityBiConsumer, you can use this same technique to call methods like thenApplyAsync and whenCompleteAsync with appropriate priorities as well.

like image 199
Brandon Avatar answered Nov 14 '22 04:11

Brandon