Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I make a Java PriorityBlockingQueue that preserves FIFO behavior?

I'm trying to create a Priority Blocking Queue in Java that maintains FIFO order for elements with the same priority. The Oracle doc provides some help with that, but I'm still very tangled up.

I should note that the following topics are all very new to me: Generics, Interfaces as Types, and static nested classes. All of these come into play in the following class definition. Generics, especially, are confusing, and I'm sure I've totally messed up with them here.

I have included comments to identify the compiler errors I am currently getting.

Several specific questions:

  1. Is it okay to have the class represent the queued event object, with the actual queue being a static class member?

  2. Was it reasonable to have included Oracle's FIFO event "wrapper" as a static nested class?

  3. Am I at least on the right track here, doing it all in one outer class?

Here is the class that I've written:

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public class FIFOPBQEvent {

/**
 * First we define a static nested class which, when instantiated,
 * encapsulates the "guts" of the event - a FIFOPBQEvent - along with
 * a sequence number that assures FIFO behavior of events of like priority.
 *  
 * The following is lifted ALMOST verbatim (I added "static" and some 
 * comments) from Oracle documentation on adding FIFO-ness to a Priority
 * Blocking Queue: 
 * http://download.oracle.com/javase/6/docs/api/java/util/concurrent/PriorityBlockingQueue.html
 * As the Oracle doc points out:
 * 
 * "A static nested class interacts with the instance members of its outer 
 * class (and other classes) just like any other top-level class. In 
 * effect, a static nested class is behaviorally a top-level class that 
 * has been nested in another top-level class for packaging convenience."
 *
 */
static class FIFOEntry<E extends Comparable<? super E>> implements
        Comparable<FIFOEntry<E>> {
    final static AtomicLong seq = new AtomicLong();
    final long seqNum;  // instance
    final E entry;

    public FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    public E getEntry() {
        return entry;
    }
    /** Here is implementation of Comparable */
    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry);
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1);
        return res;
    }
}

/**
 * Now we declare a single (static) PBQ of FIFO entries into which 
 * PBQFIFOEvents will be added and removed.
 */

/** FLAGGED AS ERROR BY COMPILER */
// Bound mismatch: The type FIFOPBQEvent is not a valid substitute for the
// bounded parameter <E extends Comparable<? super E>> of the type 
// FIFOPBQEvent.FIFOEntry<E>

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
    PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

/** 
 * And here are the "guts" of our event: the i.d. and state of the GUI widget 
 */
private ConsoleObject obj = ConsoleObject.UNDEFINED_OBJ; // widget that was affected
private ObjectState state = ObjectState.UNDEFINED_STATE; // the widget's new state

/** 
 * Constructor specifying the class variables 
 */
public FIFOPBQEvent(ConsoleObject theObj, ObjectState theState) {
    obj = theObj;
    state = theState;
}

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // The method put(FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>) in the type 
    // PriorityBlockingQueue<FIFOPBQEvent.FIFOEntry<FIFOPBQEvent>> is not 
    // applicable for the arguments (FIFOPBQEvent)

    theQueue.put(this);
}

public static FIFOPBQEvent receiveEvent() {

    /** FLAGGED AS ERROR BY COMPILER */
    // Type mismatch: cannot convert from FIFOPBQEvent.FIFOEntry<FIFOPBQEvent> 
    // to FIFOPBQEvent

    FIFOPBQEvent event = theQueue.take();
    return event;
}

/**
 * ConsoleEvent accessors
 */
public ConsoleObject getObj() {
    return this.obj;
}

public ObjectState getState() {
    return this.state;
}

/**
 * And for the first time, enums instead of public static final ints.
 */
public enum ConsoleObject {
    UNDEFINED_OBJ,
    RESERVED,

    /** Console keys */
    RESET,
    DISPLAY_MAR,
    SAVE,
    INSERT,
    RELEASE,
    START,
    SIE,
    SCE,

    /** Console toggle switches */
    POWER,
    PARITY_CHECK,
    IO_CHECK,
    OVERFLOW_CHECK,
    SENSE_SWITCH_1,
    SENSE_SWITCH_2,
    SENSE_SWITCH_3,
    SENSE_SWITCH_4
}

public enum ObjectState {
    UNDEFINED_STATE,

    /** Toggle switches */
    OFF,
    ON,

    /** Console keys */
    PRESSED,
}
}
like image 752
Chap Avatar asked Jul 08 '11 05:07

Chap


People also ask

Does priority queue follow FIFO?

PriorityQueue does not care about FIFO / LIFO. it handles priority. in case of several objects with same priority - you can't count on any of FIFO LIFO behavior.

Can you implement FIFO queue LIFO stack and priority ordering?

Primarily, objects can be added and removed from an ordered collection: their order of removal is based on the type of ordered collection: LIFO (for stacks), FIFO (for queues), and "priority ordering" for priority queues.

Is queue in Java FIFO?

Queue in Java is a linear data structure where you can handle an ordered collection of elements. It follows the FIFO principle to add elements from one end and remove them from the other end.

Can you give example implementations of the BlockingQueue interface?

BlockingQueue implementations are designed to be used primarily for producer-consumer queues, but additionally support the Collection interface. So, for example, it is possible to remove an arbitrary element from a queue using remove(x).


3 Answers

The first error is the more significant error. It occurs because the FIFOPBQEvent class doesn't implement Comparable, which it must to be considered as the generic type for the FIFOEntry nested class. This is because you restrict E and say that it extends Comparable<...>. Basically, your FIFOPBQEvent class must be comparable to provide the priority for the queue (presumably based on the event type).

To fix the error, you need to:

  1. Change the header of your class to:

    public class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {
    
  2. add a compareTo method in the FIFOPBQEvent class; something like:

    public int compareTo (FIFOPBQEvent other) {
        // TODO - compare this and other for priority
        return 0;
    }
    

Then you need to wrap your entry in your sendEvent method:

public void sendEvent () {
    theQueue.put(new FIFOEntry<FIFOPBQEvent> (this));
}

The last, minor, error is simply that you aren't unwrapping the FIFOEntry object. To fix this, change receiveEvent to:

public static FIFOPBQEvent receiveEvent () {
    FIFOPBQEvent event = theQueue.take ().getEntry ();
    return event;
}
like image 123
101100 Avatar answered Oct 22 '22 20:10

101100


Let's step through your code.

static class FIFOEntry<E extends Comparable<? super E>> implements 
  Comparable<FIFOEntry<E>> {

This defines the class FIFOEntry which takes a generic parameter. You have constrained the type of generic parameter to "Any object that implements Comparable of itself".

private static PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>> theQueue =
  PriorityBlockingQueue<FIFOEntry<FIFOPBQEvent>>();

Your declaration of PriorityBlockingQueue is not incorrect here, but your definition of FIFOEntry<FIFOPBQEvent> is incorrect. This is because of the above point - you have restricted the type of FIFOEntry to anything that implements Comparable of itself i.e. it should be

class FIFOPBQEvent implements Comparable<FIFOPBQEvent> {

Your next problem is -

public void sendEvent() {
  theQueue.put(this);
}

The type of this is FIFOPBQEvent but the queue only accepts FIFOEntry objects. To match your Queue signature it should be:

public void sendEvent() {
  // Requires FIFOPBQEvent to be Comparable
  theQueue.put(new FIFOEntry<FIFOPBQEvent>(this));     
}

You have the same problem on receiveEvent() too - your Queue signature says that the Queue contains FIFOEntry objects and you are trying to pull out FIFOPBQEvents.

like image 31
Bringer128 Avatar answered Oct 22 '22 21:10

Bringer128


Taking @101100's recommendation, I have reworked the design, decoupling the queue from the events. That seems to make it much simpler and easier to understand (and reuse), but sadly I'm still unclear on some concepts. What follows here is the PriorityFIFOEventQueue (I've omitted the Event class for brevity). And I've noted where I still need some help:

package ibm1620;

import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

/**
 * This class represents a Priority Blocking Queue of FIFO entries.  Instantiating
 * it creates a queue.  It provides instance methods for sending and receiving
 * entries, a.k.a. events of type E, on the queue.
 */

The following is flagged with diagnostic: "The type PriorityFIFOEventQueue must implement the inherited abstract method Comparable>.compareTo (PriorityFIFOEventQueue)"

I'm pretty sure I don't want to compare queues !! Still not sure what I need here.

public class PriorityFIFOEventQueue<E extends Comparable<? super E>> implements Comparable<PriorityFIFOEventQueue<E>> {

/**
 * FIFOEntry is a static nested class that wraps an Entry and provides support for
 * keeping the Entries in FIFO sequence.
 */
private static class FIFOEntry<E> implements Comparable<FIFOEntry<E>> {

    /**
     * There's going to be one atomic seq per ... queue?  runtime?
     */

    final static AtomicLong seq = new AtomicLong();

    /**
     * FIFOEntry is simply an entry of type E, and a sequence number
     */
    final long seqNum; 
    final E    entry;

    /**
     * FIFOEntry() constructor
     */
    FIFOEntry(E entry) {
        seqNum = seq.getAndIncrement();
        this.entry = entry;
    }

    /**
     * Accessor method for entry
     */
    E getEntry() {
        return entry;
    }

    /**
     * Here is implementation of Comparable that is called directly by PBQ.
     * We first invoke E's comparator which compares based on priority. 
     * If equal priority, order by sequence number (i.e. maintain FIFO).
     * */

In the following, the line containing "compareTo" is flagged, and the diagnostic is "The method compareTo(E) is undefined for the type E". Apparently I haven't told the compiler that the "other" FIFOEntry implements Comparable.

    public int compareTo(FIFOEntry<E> other) {
        int res = entry.compareTo(other.entry); // priority-based compare
        if (res == 0 && other.entry != this.entry)
            res = (seqNum < other.seqNum ? -1 : 1); // FIFO compare
        return res;
    }
}

/**
 * Here is the PriorityBlockingQueue of FIFOEntries
 */

private PriorityBlockingQueue<FIFOEntry<E>> theQueue = 
    new PriorityBlockingQueue<FIFOEntry<E>>();

/**
 * Event queuing ("sending") and dequeuing ("receiving")
 */
public void sendEvent(E event) {
    theQueue.put(new FIFOEntry<E>(event));
}

/**
 * pollEvent() 
 * Will remove and return a ConsoleEvent from the queue, or return
 * null if queue is empty.
 */
public E pollEvent() {
    E event = null;
    FIFOEntry<E> aFIFOEvent = theQueue.poll();
    if (aFIFOEvent != null) {
        event = aFIFOEvent.getEntry();
        say("pollEvent() -> "+event);
    }
    else {
    }
    return event;
}

/**
 * receiveEvent() 
 * Will block if queue is empty.  Takes a FIFOEvent off the queue,
 * unwraps it and returns the 'E' payload.
 * 
 * @return
 */
public E receiveEvent() {
    E event = null;
    try {
        FIFOEntry<E> aFIFOEvent = theQueue.take();
        if (aFIFOEvent != null) {
            event = aFIFOEvent.getEntry();
            say("receiveEvent() -> "+event);
        }

    } catch (InterruptedException e) {
        say("InterruptedException in receiveEvent()");
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    return event;
}

// for console debugging
static void say(String s) {
    System.out.println("ConsoleEvent: " + s);
}

}
like image 39
Chap Avatar answered Oct 22 '22 19:10

Chap