Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to better use ExecutorService in multithreading environment?

I need to make a library in which I will have synchronous and asynchronous methods in it.

  • executeSynchronous() - waits until I have a result, returns the result.
  • executeAsynchronous() - returns a Future immediately which can be processed after other things are done, if needed.

Core Logic of my Library

The customer will use our library and they will call it by passing DataKey builder object. We will then construct a URL by using that DataKey object and make a HTTP client call to that URL by executing it and after we get the response back as a JSON String, we will send that JSON String back to our customer as it is by creating DataResponse object. Some customer will call executeSynchronous() and some might call executeAsynchronous() so that's why I need to provide two method separately in my library.

Interface:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

And then I have my DataClient which implements the above Client interface:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            future.cancel(true); // terminating tasks that have timed out
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

Simple class which will perform the actual task:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) { // should I catch RuntimeException or just Exception here?
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

I have few questions on my above solution -

  • Should I use daemon or non daemon threads for my above use case?
  • Also, I am terminating the tasks that have timed out so that it doesn't occupy one of my limited 10 threads for a long time. Does that look right the way I am doing it?
  • In my call() method, I am catching Exception. Should I catch RuntimeException there? What is the difference if I catch Exception or RuntimeException?

When I started working on this solution, I was not terminating the tasks that have timed out. I was reporting the timeout to the client, but the task continues to run in the thread pool (potentially occupying one of my limited 10 threads for a long time). So I did some research online and I found that I can cancel my tasks those have timed out by using cancel on future as shown below -

future.cancel(true);

But I wanted to make sure, does it look right the way I am doing in my executeSynchronous method to cancel the tasks that have got timedout?

Since I am calling cancel() on the Future which will stop it from running if tasks is still in the queue so I am not sure what I am doing is right or not? What is the right approach to do this?

If there is any better way, then can anyone provide an example for that?

Should we always be terminating the tasks that have got timed out? If we don't do that then what might be the impact I will have?

like image 523
john Avatar asked Mar 15 '15 18:03

john


People also ask

What is ExecutorService multithreading?

ExecutorService is a JDK API that simplifies running tasks in asynchronous mode. Generally speaking, ExecutorService automatically provides a pool of threads and an API for assigning tasks to it.

What are the advantages of using ExecutorService instead of creating threads directly?

Below are some benefits: Executor service manage thread in asynchronous way. Use Future callable to get the return result after thread completion. Manage allocation of work to free thread and resale completed work from thread for assigning new work automatically.

Is ExecutorService thread-safe?

ExecutorService does not mandate or otherwise guarantee that all implementations are thread-safe, and it cannot as it is an interface. These types of contracts are outside of the scope of a Java interface. However, ThreadPoolExecutor both is and is clearly documented as being thread-safe.

What method do you call to run a task on a worker thread using an ExecutorService?

We use the Executors. newSingleThreadExecutor() method to create an ExecutorService that uses a single worker thread for executing tasks. If a task is submitted for execution and the thread is currently busy executing another task, then the new task will wait in a queue until the thread is free to execute it.


2 Answers

Should I use daemon or non daemon threads for my above use case?

It depends on whether or not you want these threads to stop the program from exiting. The JVM will exit when the last non-daemon thread finishes.

If these tasks can be killed at any time if the JVM exists then they should be daemon. If you want the JVM to wait for them then make them non-daemon.

See: java daemon thread and non-daemon thread

Also, I am terminating the tasks that have timed out so that it doesn't occupy one of my limited 10 threads for a long time. Does that look right the way I am doing it?

Yes and no. You are properly calling cancel() on the Future which will stop it from running if it is still in the queue. However, if the thread is already running the task then the cancel will just interrupt the thread. Chances are that the restTemplate calls are not interruptible so the interrupt will be ignored. Only certain methods (like Thread.sleep(...) are interruptible and throw InterruptException. So calling future.cancel(true) won't stop the operation and terminate the thread.

See: Thread not interrupting

One thing you could do is to put a cancel() method on your Task object which forcefully closes the restTemplate. You will need to experiment with this. Another idea is to set some sort of timeout on the restTemplate connection or IO so it doesn't wait forever.

If you are working with the Spring RestTemplate then there is not a direct close but you can close the underlying connection I believe which may be via the SimpleClientHttpRequestFactory so you will need to call disconnect() on the underlying HttpURLConnection.

In my call() method, I am catching Exception. Should I catch RuntimeException there?

RuntimeException extends Exception so you will already catch them.

What is the difference if I catch Exception or RuntimeException?

Catching Exception catches both the checked (non-runtime) exceptions and the runtime exceptions. Catching just the RuntimeException will mean that any defined exceptions will not be caught and will be thrown by the method.

RuntimeExceptions are special exceptions that do not need to be checked by the code. For example, any code can throw an IllegalArgumentException without defining the method as throws IllegalArgumentException. With checked exceptions, it is a compiler error if a checked exception is not caught or thrown by the caller method but this is not true with RuntimeExceptions.

Here's a good answer on the topic:

  • Java: checked vs unchecked exception explanation
like image 200
Gray Avatar answered Oct 08 '22 09:10

Gray


Should I use daemon or non daemon threads for my above use case?

It depends. But in this case I would prefer daemon threads because it is convenient to use clients which allows the process to exit.

Does that look right the way I am doing it?

No, it doesn't. Interrupting of an IO task is pretty hard. Try to set timeout in RestTemplate too. Canceling of the future in this case seems meaningless.

What is the difference if I catch Exception or RuntimeException?

If you don't have checked exceptions in try blocks there is no difference :) Just because only RuntimeExceptions possible in that case.

And one more important note: implementing sync call as async + waiting is bad idea. It is meaningless and consumes one thread from the thread pool per a call. Just create instance of the Task and call it in current thread!

like image 44
Denis Borovikov Avatar answered Oct 08 '22 08:10

Denis Borovikov