Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are the ways to pass threadpoolexecutor to CompletableFuture?

I have been working on Java CompletableFuture lately and found , we should always use customized Threadpool. With it, I found two ways of passing threadpool to existing code. Like below

This is my ThreadPool in configuration file

@Override
@Bean(name = "commonThreadPool")
public Executor getAsyncExecutor() {
  return new ThreadPoolTaskExecutor();
}

1. Passing existingThreadPool in argument.

 @Autowired
 @Qualifier("commonThreadPool") 
 TaskExecutor existingThreadPool;       
 CompletableFuture.runAsync(() -> executeTask(),existingThreadPool);

2. Using async like below

@Async("commonThreadPool")
public void executeTask() {
// Execute Some Task
}

is there any third way where I can write CompletableFuture Handler or Override its existing behaviour at single place where I can pass custom Threadpool. And after that wherever I use below code, it should pick my existing ThreadPool instead of forkJoin pool.

 CompletableFuture.runAsync(() -> executeTask());
like image 721
Mayur Avatar asked May 19 '19 06:05

Mayur


People also ask

How do you prevent ThreadPoolExecutor?

You can call the cancel() function on the Future object to cancel the task before it has started running. If your task has already started running, then calling cancel() will have no effect and you must wait for the task to complete.

Does CompletableFuture use ForkJoinPool?

Yes! CompletableFuture executes these tasks in a thread obtained from the global ForkJoinPool. commonPool(). But hey, you can also create a Thread Pool and pass it to runAsync() and supplyAsync() methods to let them execute their tasks in a thread obtained from your thread pool.

How do you use Executorservice with CompletableFuture?

The above code block shows how we can use the Executor in CompletableFuture. We create the Executor Service object at line 7 with thread pool as fixed thread pool with 2 as value. As a next step in line 20, we just simply provide it in the runAsync() method as a parameter of CompletableFuture class.


1 Answers

There is no standard way for replacing the default executor for all CompletableFuture instances. But since Java 9, you can define a default executor for subclasses. E.g. with

public class MyCompletableFuture<T> extends CompletableFuture<T> {
    static final Executor EXEC = r -> {
        System.out.println("executing "+r);
        new Thread(r).start();
    };

    @Override
    public Executor defaultExecutor() {
        return EXEC;
    }

    @Override
    public <U> CompletableFuture<U> newIncompleteFuture() {
        return new MyCompletableFuture<>();
    }

    public static CompletableFuture<Void> runAsync​(Runnable runnable) {
        Objects.requireNonNull(runnable);
        return supplyAsync(() -> {
            runnable.run();
            return null;
        });
    }

    public static <U> CompletableFuture<U> supplyAsync​(Supplier<U> supplier) {
        return new MyCompletableFuture<U>().completeAsync(supplier);
    }
}

you did all necessary steps for defining the default executor for all chained stages of the MyCompletableFuture. The executor hold in EXEC only serves as an example, producing a printout when used, so when you use that example class like

MyCompletableFuture.supplyAsync(() -> "test")
    .thenApplyAsync(String::toUpperCase)
    .thenAcceptAsync(System.out::println);

it will print

executing java.util.concurrent.CompletableFuture$AsyncSupply@65ab7765
executing java.util.concurrent.CompletableFuture$UniApply@119d7047
executing java.util.concurrent.CompletableFuture$UniAccept@404b9385
TEST
like image 183
Holger Avatar answered Nov 15 '22 19:11

Holger