The java code sample below uses a java DelayQueue to process tasks. However the insertion of a task from another thread appears to disrupt (my) expected behaviour.
Apologies that the code example is so long, but in summary:
The output that I get from the code sample is:
------initial tasks ---------------
task A due in 0ms
task B due in 9ms
task C due in 99ms
task D due in 999ms
task E due in 9999ms
task F due in 99999ms
------processing--------------------
time = 5 task A due in -1ms
time = 14 task B due in 0ms
time = 104 task C due in 0ms
time = 1004 task D due in 0ms
time = 3003 added task Z due in 0ms
------remaining after 15007ms -----------
task F due in 84996ms
task E due in -5003ms
task Z due in -12004ms
My question is: why after 15000ms are there expired Tasks remaining in the DelayQueue (ie where GetDelay() returns a -ve value)?
Some things that I have checked:
I will be most interested to learn how to resolve this problem. Thank you in advance for your assistance. (and for all of those Stack Overflow answers that have helped me to date :)
package test;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Test10_DelayQueue {
private static final TimeUnit delayUnit = TimeUnit.MILLISECONDS;
private static final TimeUnit ripeUnit = TimeUnit.NANOSECONDS;
static long startTime;
static class Task implements Delayed {
public long ripe;
public String name;
public Task(String name, int delay) {
this.name = name;
ripe = System.nanoTime() + ripeUnit.convert(delay, delayUnit);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof Task) {
return compareTo((Task) obj) == 0;
}
return false;
}
@Override
public int hashCode() {
int hash = 7;
hash = 67 * hash + (int) (this.ripe ^ (this.ripe >>> 32));
hash = 67 * hash + (this.name != null ? this.name.hashCode() : 0);
return hash;
}
@Override
public int compareTo(Delayed delayed) {
if (delayed instanceof Task) {
Task that = (Task) delayed;
return (int) (this.ripe - that.ripe);
}
throw new UnsupportedOperationException();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(ripe - System.nanoTime(), ripeUnit);
}
@Override
public String toString() {
return "task " + name + " due in " + String.valueOf(getDelay(delayUnit) + "ms");
}
}
static class TaskAdder implements Runnable {
DelayQueue dq;
int delay;
public TaskAdder(DelayQueue dq, int delay) {
this.dq = dq;
this.delay = delay;
}
@Override
public void run() {
try {
Thread.sleep(delay);
Task z = new Task("Z", 0);
dq.add(z);
Long elapsed = System.currentTimeMillis() - startTime;
System.out.println("time = " + elapsed + "\tadded " + z);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
startTime = System.currentTimeMillis();
DelayQueue<Task> taskQ = new DelayQueue<Task>();
Thread thread = new Thread(new TaskAdder(taskQ, 3000));
thread.start();
taskQ.add(new Task("A", 0));
taskQ.add(new Task("B", 10));
taskQ.add(new Task("C", 100));
taskQ.add(new Task("D", 1000));
taskQ.add(new Task("E", 10000));
taskQ.add(new Task("F", 100000));
System.out.println("------initial tasks ---------------");
Task[] tasks = taskQ.toArray(new Task[0]);
for (int i = 0; i < tasks.length; i++) {
System.out.println(tasks[i]);
}
System.out.println("------processing--------------------");
try {
Long elapsed = System.currentTimeMillis() - startTime;
while (elapsed < 15000) {
Task task = taskQ.poll(1, TimeUnit.SECONDS);
elapsed = System.currentTimeMillis() - startTime;
if (task != null) {
System.out.println("time = " + elapsed + "\t" + task);
}
}
System.out.println("------remaining after " + elapsed + "ms -----------");
tasks = taskQ.toArray(new Task[0]);
for (int i = 0; i < tasks.length; i++) {
System.out.println(tasks[i]);
}
} catch (InterruptedException e) {
}
}
}
Because your comapareTo
method is full of flaws. Correct Implementation is as below. Once you change like below your all problems will get solve. Always try to reuse compareTo
method if or adhere to compareTo
Contract
return Long.valueOf(this.ripe).compareTo(that.ripe);
The reason is due to numerical overflow.
Your compareTo()
method is casting a long
difference in nanoseconds to int
, but more than 2.2 seconds worth of nanoseconds can not be held in an int
and you'll get an overflow - giving more or less a random result, so the order in the queue may be behind one with a later expiry if it expires more than 2.2 seconds in the future.
poll()
doesn't look beyond the next item in the queue, the order of which is defined by the compareTo
method when the item is placed in the queue.
Also, equals()
should to agree with hashCode()
, as well as compareTo()
. See javadoc for hashCode()
for more info on this.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With