Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java.util.concurrent.DelayQueue overlooking expired elements

The java code sample below uses a java DelayQueue to process tasks. However the insertion of a task from another thread appears to disrupt (my) expected behaviour.

Apologies that the code example is so long, but in summary:

  1. The main thread adds 5 tasks (A-E) to a DelayQueue with various delays (0ms, 10ms, 100ms 1000ms, 10000ms)
  2. Another tread is started which adds another task to the DelayQueue after 3000ms
  3. The main thread polls the DelayQueue and reports as each Task expires
  4. After 8000ms the main thread reports the Tasks remaining in the DelayQueue

The output that I get from the code sample is:

------initial tasks ---------------
task A due in 0ms
task B due in 9ms
task C due in 99ms
task D due in 999ms
task E due in 9999ms
task F due in 99999ms
------processing--------------------
time = 5    task A due in -1ms
time = 14   task B due in 0ms
time = 104  task C due in 0ms
time = 1004 task D due in 0ms
time = 3003 added task Z due in 0ms
------remaining after 15007ms -----------
task F due in 84996ms
task E due in -5003ms
task Z due in -12004ms

My question is: why after 15000ms are there expired Tasks remaining in the DelayQueue (ie where GetDelay() returns a -ve value)?

Some things that I have checked:

  • I have implemented compareTo() to define the natural order of Tasks
  • equals() is consistent with compareTo()
  • hashCode() has been overridden

I will be most interested to learn how to resolve this problem. Thank you in advance for your assistance. (and for all of those Stack Overflow answers that have helped me to date :)

    package test;

    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;

    public class Test10_DelayQueue {

       private static final TimeUnit delayUnit = TimeUnit.MILLISECONDS;
       private static final TimeUnit ripeUnit = TimeUnit.NANOSECONDS;

       static long startTime;

       static class Task implements Delayed {    
          public long ripe;
          public String name;    
          public Task(String name, int delay) {
             this.name = name;
             ripe = System.nanoTime() + ripeUnit.convert(delay, delayUnit);
          }

      @Override
      public boolean equals(Object obj) {
         if (obj instanceof Task) {
            return compareTo((Task) obj) == 0;
         }
         return false;
      }

      @Override
      public int hashCode() {
         int hash = 7;
         hash = 67 * hash + (int) (this.ripe ^ (this.ripe >>> 32));
         hash = 67 * hash + (this.name != null ? this.name.hashCode() : 0);
         return hash;
      }

      @Override
      public int compareTo(Delayed delayed) {
         if (delayed instanceof Task) {
            Task that = (Task) delayed;
            return (int) (this.ripe - that.ripe);
         }
         throw new UnsupportedOperationException();
      }

      @Override
      public long getDelay(TimeUnit unit) {
         return unit.convert(ripe - System.nanoTime(), ripeUnit);
      }

      @Override
      public String toString() {
         return "task " + name + " due in " + String.valueOf(getDelay(delayUnit) + "ms");
          }
       }

       static class TaskAdder implements Runnable {

      DelayQueue dq;
      int delay;

      public TaskAdder(DelayQueue dq, int delay) {
         this.dq = dq;
         this.delay = delay;
      }

      @Override
      public void run() {
         try {
            Thread.sleep(delay);

            Task z = new Task("Z", 0);
            dq.add(z);

            Long elapsed = System.currentTimeMillis() - startTime;

            System.out.println("time = " + elapsed + "\tadded " + z);

         } catch (InterruptedException e) {
         }
      }
    }

    public static void main(String[] args) {
      startTime = System.currentTimeMillis();
      DelayQueue<Task> taskQ = new DelayQueue<Task>();

      Thread thread = new Thread(new TaskAdder(taskQ, 3000));
      thread.start();

      taskQ.add(new Task("A", 0));
      taskQ.add(new Task("B", 10));
      taskQ.add(new Task("C", 100));
      taskQ.add(new Task("D", 1000));
      taskQ.add(new Task("E", 10000));
      taskQ.add(new Task("F", 100000));

      System.out.println("------initial tasks ---------------");
      Task[] tasks = taskQ.toArray(new Task[0]);
      for (int i = 0; i < tasks.length; i++) {
         System.out.println(tasks[i]);
      }

      System.out.println("------processing--------------------");
      try {
         Long elapsed = System.currentTimeMillis() - startTime;
         while (elapsed < 15000) {
            Task task = taskQ.poll(1, TimeUnit.SECONDS);
            elapsed = System.currentTimeMillis() - startTime;
            if (task != null) {
               System.out.println("time = " + elapsed + "\t" + task);
            }
         }

         System.out.println("------remaining after " + elapsed + "ms -----------");
         tasks = taskQ.toArray(new Task[0]);
         for (int i = 0; i < tasks.length; i++) {
            System.out.println(tasks[i]);
         }

      } catch (InterruptedException e) {
      }
    }
    }
like image 831
nhoj Avatar asked Aug 27 '12 12:08

nhoj


2 Answers

Because your comapareTo method is full of flaws. Correct Implementation is as below. Once you change like below your all problems will get solve. Always try to reuse compareTo method if or adhere to compareTo Contract

return Long.valueOf(this.ripe).compareTo(that.ripe);
like image 142
Amit Deshpande Avatar answered Nov 12 '22 11:11

Amit Deshpande


The reason is due to numerical overflow.

Your compareTo() method is casting a long difference in nanoseconds to int, but more than 2.2 seconds worth of nanoseconds can not be held in an int and you'll get an overflow - giving more or less a random result, so the order in the queue may be behind one with a later expiry if it expires more than 2.2 seconds in the future.

poll() doesn't look beyond the next item in the queue, the order of which is defined by the compareTo method when the item is placed in the queue.


Also, equals() should to agree with hashCode(), as well as compareTo(). See javadoc for hashCode() for more info on this.

like image 5
Bohemian Avatar answered Nov 12 '22 11:11

Bohemian