Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Secure and effective way for waiting for asynchronous task

In the system, I have an object - let's call it TaskProcessor. It holds queue of tasks, which are executed by some pool of threads (ExecutorService + PriorityBlockingQueue) The result of each task is saved in the database under some unique identifier.

The user, who knows this unique identifier, may check the result of this task. The result could be in the database, but also the task could still wait in the queue for execution. In that case, UserThread should wait until the task will be finished.

Additionally, the following assumptions are valid:

  • Someone else could enqueue the task to TaskProcessor and some random UserThread can access the result if he knows the unique identifier.

  • UserThread and TaskProcess are in the same app. TaskProcessor contains a pool of threads, and UserThread is simply servlet Thread.

  • UserThread should be blocked when asking for the result, and the result is not completed yet. UserThread should be unblocked immediately after TaskProcessor complete task (or tasks) grouped by a unique identifier

My first attempt (the naive one), was to check the result in the loop and sleep for some time:

// UserThread
while(!checkResultIsInDatabase(uniqueIdentifier))
  sleep(someTime)

But I don't like it. First of all, I am wasting database connections. Moreover, if the task would be finished right after sleep, then the user will wait even if the result just appeared.

Next attempt was based on wait/notify:

//UserThread 
while (!checkResultIsInDatabase())
  taskProcessor.wait()

//TaskProcessor
... some complicated calculations
this.notifyAll()

But I don't like it either. If more UserThreads will use TaskProcessor, then they will be wakened up unnecessarily every time some task would be completed and moreover - they will make unnecessary database calls.

The last attempt was based on something which I called waitingRoom:

//UserThread
Object mutex = new Object();
taskProcessor.addToWaitingRoom(uniqueIdentifier, mutex)
while (!checkResultIsInDatabase())
  mutex.wait()

//TaskProcessor
... Some complicated calculations
if (uniqueIdentifierExistInWaitingRoom(taskUniqueIdentifier))
  getMutexFromWaitingRoom(taskUniqueIdentifier).notify()

But it seems to be not secure. Between database check and wait(), the task could be completed (notify() wouldn't be effective because UserThread didn't invoke wait() yet), which may end up with deadlock.

It seems, that I should synchronize it somewhere. But I am afraid that it will be not effective. Is there a way to correct any of my attempts, to make them secure and effective? Or maybe there is some other, better way to do this?

like image 319
mkuligowski Avatar asked Mar 07 '19 22:03

mkuligowski


People also ask

What is asynchronous task queue?

Imagine, you are now pushing the computing part of your application to a queue. This queue would handle every task independently, minimizing user latency and ensuring that even if one of them fails, the rest of the task/data gets processed. This is Asynchronous task processing.

How does async and await work?

The async keyword turns a method into an async method, which allows you to use the await keyword in its body. When the await keyword is applied, it suspends the calling method and yields control back to its caller until the awaited task is complete. await can only be used inside an async method.

What of the following reasons are appropriate to use asynchronous programming?

Asynchronous programming can help systems run more effectively, depending on the situation and often prevents long wait times. For example, if a task you want to perform uses a lot of input and output, asynchronous programming lets other tasks run, whereas synchronous programming would create a time block.

Does Async improve performance?

Using asynchronous code does not give an increased performance on a development machine. The reason is that there is not enough load to see the overall benefit. But for a production environment, it can process more requests.


1 Answers

You seem to be looking for some sort of future / promise abstraction. Take a look at CompletableFuture, available since Java 8.

CompletableFuture<Void> future = CompletableFuture.runAsync(db::yourExpensiveOperation, executor);

// best approach: attach some callback to run when the future is complete, and handle any errors
future.thenRun(this::onSuccess)
        .exceptionally(ex -> logger.error("err", ex));

// if you really need the current thread to block, waiting for the async result:
future.join(); // blocking! returns the result when complete or throws a CompletionException on error

You can also return a (meaningful) value from your async operation and pass the result to the callback. To make use of this, take a look at supplyAsync(), thenAccept(), thenApply(), whenComplete() and the like.

You can also combine multiple futures into one and a lot more.

like image 198
Costi Ciudatu Avatar answered Oct 05 '22 12:10

Costi Ciudatu