I have scenario as depicted in the below diagram
Here main thread ismy java application.it opens one WM thread for execution. WM handling task execution.he needs to call number of task for execution. Suppose it contains task T1,T2,T3
T3 is dependent on T2 and T2 is dependent on T1. WM first call RM to perform task execution of T1. T1 can give response in paging or be after completion of T1.
By question is how can I wait for T1 to complete and then start T2's execution.and how can I notify to WM when T1 partially complete sends data in paging.
This is the simple scenario but in case of T1,T2,T3,T4. T3 is dependent on T1 and T2.
Code :
public class TestAsync implements TaskCallBack {
public static ExecutorService exService = Executors.newFixedThreadPool(5);
public static void main(String args[]) throws InterruptedException, ExecutionException{
Task t1 = new Task();
t1.doTask(new TestAsync());
}
public static ExecutorService getPool(){
return exService;
}
@Override
public void taskCompleted(String obj) {
System.out.println(obj);
}
}
class Task {
public void doTask(TaskCallBack tcb) throws InterruptedException, ExecutionException{
FutureTask<String> ft = new FutureTask<>(new Task1());
TestAsync.getPool().execute(ft);
tcb.taskCompleted(ft.get());
}
}
class Task1 implements Callable<String>{
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName());
return "done";
}
interface TaskCallBack{
public void TaskCompleted(String obj);
}
}
This is pretty interesting topic. I faced similar problem developing highly parallel network packet processing solution. I'll share my findings, but before that I should say it is always a bad idea to use some kind of ad-hoc solution for any parallel system.
Debugging, optimisation and further development might become a nightmare without proper architecture support. Let's say we have three dependent tasks:
First solution
will be to to introduce composite or compound task
abstraction, in order to let dependent tasks be executed in a proper order and get rid of delays, waiting/blocking, complex task management, etc.
I will use simplified code in order to illustrate this approach:
/**
* Defines a high-level task contract.
* Let's pretend it is enough to have it this simple.
*/
interface Task extends Runnable {
}
/**
* Defines a simple way to group dependent tasks.
*
* Please note, this is the simplest way to make sure dependent tasks will be
* executed in a specified order without any additional overhead.
*/
class CompoundTasks implements Task {
private List<Task> tasks = ...;
public void add(Task task) {
tasks.add(task);
}
@Override
public void run() {
for(Task t : tasks) {
t.run();
}
}
}
Second solution
will be to let tasks have explicit dependencies and make executors aware of that. Basically, the rule is quite simple - if task has unresolved dependencies it should be postponed. This approach can be easily implemented and works pretty fine.
Please note, second solution will introduce a tiny performance penalty due to the fact some resources will be needed to verify tasks, manage queue, etc.
Let's evolve our task-based approach:
/**
* Defines yet another abstraction to make dependencies
* visible and properly tracked.
*
*/
abstract class DependentTask implements Task {
private List<DependentTask> dependencies = ...;
public void addDependency(DependentTask task) {
dependencies.add(task);
}
/**
* Verifies task can be processed.
*/
public boolean hasDependenciesResolved() {
boolean result = true;
for(DependentTask t : dependencies) {
if(!t.hasDependenciesResolved()) {
result = false;
break;
}
}
return result;
}
@Override
public abstract void run();
}
/**
* Implements a very basic queue aware of task dependencies.
*
* Basically, the idea is just to avoid any blocking state. If task can't
* be processed (because of unresolved dependencies) it should be
* postponed and verified later.
*/
class TaskQueue<T extends DependentTask> implements Runnable {
private Queue<T> queue = ...;
@Override
public void run() {
while(true) {
if(!queue.isEmpty()) {
T task = queue.poll();
// Verify all dependencies have been resolved.
if(task.hasDependenciesResolved()) {
task.run(); // process task if there is no unresolved
// dependencies
}else{
queue.add(task); // return task to the queue
}
}else{
// sleep for some reasonable amount of time
}
}
}
}
Both approaches are easily traceable, so you always will be able to understand what is going on.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With