I want to be executing some different tasks in parallel, but have the concept that if a task is already queued or is currently processing, it will not get re-queued. I have read up a little on the Java API and have come up with the code below, which seems to work. Can anybody shed light on whether the method I am using is the best approach. Any dangers (thread safety?) or better ways to do this? Code is as below:
import java.util.HashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestExecution implements Runnable {
String key1;
String key2;
static HashMap<TestExecution, Future<?>> executions = new HashMap<TestExecution, Future<?>>();
static LinkedBlockingQueue<Runnable> q = new LinkedBlockingQueue<Runnable>();
static ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, q);
public static void main(String[] args) {
try {
execute(new TestExecution("A", "A"));
execute(new TestExecution("A", "A"));
execute(new TestExecution("B", "B"));
Thread.sleep(8000);
execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static boolean execute(TestExecution e) {
System.out.println("Handling "+e.key1+":"+e.key2);
if (executions.containsKey(e)) {
Future<?> f = (Future<?>) executions.get(e);
if (f.isDone()) {
System.out.println("Previous execution has completed");
executions.remove(e);
} else {
System.out.println("Previous execution still running");
return false;
}
}
else {
System.out.println("No previous execution");
}
Future<?> f = tpe.submit(e);
executions.put(e, f);
return true;
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Follow up to comment below:
The plan is that triggering the tasks to execute will be handled by cron calling RESTful web service. For example below is the setup for one task triggered at 9:30 every day, plus another scheduled every two minutes.
0/2 * * * * restclient.pl key11 key12
30 09 * * * restclient.pl key21 key22
In this case, if task key11:key12 is running, or already queued to run, I don't want to queue another instance. I understand we have other options for scheduling, however we tend to use cron for other tasks, so I want to try to keep this.
Second Update. In response to comments so far I have re-written the code, could you comment on any issues with the following updated solution?
import java.util.concurrent.LinkedBlockingQueue;
public class TestExecution implements Runnable {
String key1;
String key2;
static TestThreadPoolExecutor tpe = new TestThreadPoolExecutor(new LinkedBlockingQueue<Runnable>());
public static void main(String[] args) {
try {
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("A", "A"));
tpe.execute(new TestExecution("B", "B"));
Thread.sleep(8000);
tpe.execute(new TestExecution("B", "B"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
public boolean equals(Object obj)
{
if (obj instanceof TestExecution)
{
TestExecution t = (TestExecution) obj;
return (key1.equals(t.key1) && key2.equals(t.key2));
}
return false;
}
public int hashCode ()
{
return key1.hashCode()+key2.hashCode();
}
public void run() {
try {
System.out.println("Start processing "+key1+":"+key2);
Thread.sleep(4000);
System.out.println("Finish processing "+key1+":"+key2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class TestThreadPoolExecutor extends ThreadPoolExecutor {
Set<Runnable> executions = Collections.synchronizedSet(new HashSet<Runnable>());
public TestThreadPoolExecutor(LinkedBlockingQueue<Runnable> q) {
super(2, 5, 1, TimeUnit.MINUTES, q);
}
public void execute(Runnable command) {
if (executions.contains(command)) {
System.out.println("Previous execution still running");
return;
}
else {
System.out.println("No previous execution");
}
super.execute(command);
executions.add(command);
}
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
executions.remove(r);
}
}
In computer programming, a thread pool is a software design pattern for achieving concurrency of execution in a computer program. Often also called a replicated workers or worker-crew model, a thread pool maintains multiple threads waiting for tasks to be allocated for concurrent execution by the supervising program.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.
A thread pool is a pool threads that can be "reused" to execute tasks, so that each thread may execute more than one task. A thread pool is an alternative to creating a new thread for each task you need to execute.
Here is how I would handle and avoid duplicates
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.*;
public class TestExecution implements Callable<Void> {
private static final ThreadPoolExecutor TPE = new ThreadPoolExecutor(2, 5, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>());
private static final Set<TestExecution> TE_SET = Collections.newSetFromMap(new ConcurrentHashMap<TestExecution, Boolean>());
private final String key1;
private final String key2;
public static void main(String... args) throws InterruptedException {
new TestExecution("A", "A").execute();
new TestExecution("A", "A").execute();
new TestExecution("B", "B").execute();
Thread.sleep(8000);
new TestExecution("A", "A").execute();
new TestExecution("B", "B").execute();
new TestExecution("B", "B").execute();
TPE.shutdown();
}
public TestExecution(String key1, String key2) {
this.key1 = key1;
this.key2 = key2;
}
void execute() {
if (TE_SET.add(this)) {
System.out.println("Handling " + this);
TPE.submit(this);
} else {
System.out.println("... ignoring duplicate " + this);
}
}
public boolean equals(Object obj) {
return obj instanceof TestExecution &&
key1.equals(((TestExecution) obj).key1) &&
key2.equals(((TestExecution) obj).key2);
}
public int hashCode() {
return key1.hashCode() * 31 + key2.hashCode();
}
@Override
public Void call() throws InterruptedException {
if (!TE_SET.remove(this)) {
System.out.println("... dropping duplicate " + this);
return null;
}
System.out.println("Start processing " + this);
Thread.sleep(4000);
System.out.println("Finish processing " + this);
return null;
}
public String toString() {
return key1 + ':' + key2;
}
}
prints
Handling A:A
... ignoring duplicate A:A
Handling B:B
Start processing A:A
Start processing B:B
Finish processing A:A
Finish processing B:B
Handling A:A
Handling B:B
Start processing A:A
Start processing B:B
... ignoring duplicate B:B
Finish processing B:B
Finish processing A:A
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With