Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tracking the progress between Queues in a Map

I have currently two queues and items traveling between them. Initially, an item gets put into firstQueue, then one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it. These moves obviously include some processing. I need to be able to get the status of any item (IN_FIRST, AFTER_FIRST, IN_SECOND, AFTER_SECOND, or ABSENT) and I implemented it manually by doing the update of the statusMap where the queue gets modified like

while (true) {
    Item i = firstQueue.take();
    statusMap.put(i, AFTER_FIRST);
    process(i);
    secondQueue.add(i);
    statusMap.put(i, IN_SECOND);
}

This works, but it's ugly and leaves a time window where the status is inconsistent. The inconsistency is no big deal and it'd solvable by synchronization, but this could backfire as the queue is of limited capacity and may block. The ugliness bothers me more.

Efficiency hardly matters as the processing takes seconds. Dedicated threads are used in order to control concurrency. No item should ever be in multiple states (but this is not very important and not guaranteed by my current racy approach). There'll be more queues (and states) and they'll of different kinds (DelayQueue, ArrayBlockingQueue, and maybe PriorityQueue).

I wonder if there's a nice solution generalizable to multiple queues?

like image 406
maaartinus Avatar asked May 31 '18 13:05

maaartinus


2 Answers

I see that your model might be improved in consistency, state control, and scaling.

A way of to implement this is accouple the item to your state, enqueue and dequeue this couple and create a mechanism to ensure state change.

My proposal can be see in figure below:

enter image description here

According with this model and your example, we can to do:

package stackoverflow;

import java.util.concurrent.LinkedBlockingQueue;

import stackoverflow.item.ItemState;
import stackoverflow.task.CreatingTask;
import stackoverflow.task.FirstMovingTask;
import stackoverflow.task.SecondMovingTask;

public class Main {

    private static void startTask(String name, Runnable r){
        Thread t = new Thread(r, name);
        t.start();
    }

    public static void main(String[] args) {
        //create queues
        LinkedBlockingQueue<ItemState> firstQueue = new LinkedBlockingQueue<ItemState>();
        LinkedBlockingQueue<ItemState> secondQueue = new LinkedBlockingQueue<ItemState>();
        //start three threads
        startTask("Thread#1", new CreatingTask(firstQueue));
        startTask("Thread#2", new FirstMovingTask(firstQueue, secondQueue));
        startTask("Thread#3", new SecondMovingTask(secondQueue));
    }
}

Each task runs the operations op() of according with below affirmation on ItemState:

one of three dedicated thread moves it to secondQueue and finally another dedicated thread removes it.

ItemState is a immutable object that contains Item and your State. This ensures consistency between Item and State values.

ItemState has acknowledgement about the next state creating a mechanism of self-controled state:

public class FirstMovingTask {
    //others codes
    protected void op() {
            try {
                //dequeue
                ItemState is0 = new ItemState(firstQueue.take());
                System.out.println("Item " + is0.getItem().getValue() + ": " + is0.getState().getValue());
                //process here
                //enqueue
                ItemState is1 = new ItemState(is0);
                secondQueue.add(is1);
                System.out.println("Item " + is1.getItem().getValue() + ": " + is1.getState().getValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    //others codes
}

With ItemState implemetation:

public class ItemStateImpl implements ItemState {
    private final Item item;
    private final State state;

    public ItemStateImpl(Item i){
        this.item = i;
        this.state = new State();
    }

    public ItemStateImpl(ItemState is) {
        this.item = is.getItem();
        this.state = is.getState().next();
    }

    // gets attrs
}

So this way is possible build solutions more elegant, flexible and scalable. Scalable because you can to control more states only changing next() and generalizing the moving task for increase the number of queue.

Results:

Item 0: AFTER_FIRST
Item 0: IN_FIRST
Item 0: IN_SECOND
Item 0: AFTER_SECOND
Item 1: IN_FIRST
Item 1: AFTER_FIRST
Item 1: IN_SECOND
Item 1: AFTER_SECOND
Item 2: IN_FIRST
Item 2: AFTER_FIRST
Item 2: IN_SECOND
... others

UPDATE(06/07/2018): analysing the use of map for search Search in map using equals values like comparator might not work because usally the mapping between values and identity (key/hash) is not one-to-one(see figure bellow). In this way is need to create an sorted list for search values which results in O(n) (worst-case).

enter image description here

with Item.getValuesHashCode():

private int getValuesHashCode(){
  return new HashCodeBuilder().append(value).hashCode();
}

In this case, you must keep Vector<ItemState> instead of Item and to use the key like the result of getValuesHashCode. Change the mechanism of state-control for keep first reference of the Item and the state current. See bellow:

//Main.class
public static void main(String[] args) {
    ... others code ...

    //references repository
    ConcurrentHashMap<Integer, Vector<ItemState>> statesMap = new ConcurrentHashMap<Integer, Vector<ItemState>>();
    //start three threads
    startTask("Thread#1", new CreatingTask(firstQueue, statesMap));

    ... others code ...
}

//CreateTask.class
protected void op() throws InterruptedException {
    //create item
    ItemState is = new ItemStateImpl(new Item(i++, NameGenerator.name()));
    //put in monitor and enqueue
    int key = is.getHashValue();
    Vector<ItemState> items = map.get(key);
    if (items == null){
        items = new Vector<>();
        map.put(key, items);
    }
    items.add(is);
    //enqueue
    queue.put(is);
}

//FirstMovingTask.class
protected void op() throws InterruptedException{
    //dequeue
    ItemState is0 = firstQueue.take();
    //process
    ItemState is1 = process(is0.next());
    //enqueue 
    secondQueue.put(is1.next());
}

//ItemState.class
public ItemState next() {
    //required for consistent change state
    synchronized (state) {
        state = state.next();
        return this;
    }
}

To search you must use concurrentMapRef.get(key). The result will the reference of updated ItemState.

Results in my tests for :

# key = hash("a")
# concurrentMapRef.get(key)
...
Item#7#0    : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_FIRST 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - IN_SECOND 
Item#12#1   : a - IN_FIRST 
... many others lines
Item#7#0    : a - AFTER_SECOND 
Item#12#1   : a - IN_FIRST 

More details in code: https://github.com/ag-studies/stackoverflow-queue

UPDATED IN 06/09/2018: redesign

Generalizing this project, I can undestand that the state machine is something like:

enter image description here

In this way I decoupled the workers of the queues for improve concepts. I used an MemoryRep for keep the unique reference for item in overall processment. Of course that you can use strategies event-based if you need keep ItemState in a physic repository.

This keep the previous idea and creates more legibility for the concepts. See this:

enter image description here

I understand that each job will have two queue (input/output) and relationship with a business model! The researcher will always find the most updated and consistent state of Item.

So, answering your ask:

  • I can find the consistent state of Item anywhere using MemoryRep (basically an Map), wrapping state and item in ItemState, and controlling the change state on job on enqueue or dequeue it.

  • The performace is keeped, except on running of next()

  • The state is allways consistent (for your problem)

  • In this model is possible use any queue type, any number of jobs/queues, and any number of state.

  • Additionaly this is beautiful!!

like image 109
Aristofanio Garcia Avatar answered Oct 21 '22 07:10

Aristofanio Garcia


Does it make sense to wrap the queues with logic to manage the Item status?

public class QueueWrapper<E> implements BlockingQueue<E> {
    private Queue<E> myQueue = new LinkedBlockingQueue<>();
    private Map<E, Status> statusMap;

    public QueueWrapper(Map<E, Status> statusMap) {
        this.statusMap = statusMap;
    }

    [...]
    @Override
    public E take() throws InterruptedException {
        E result = myQueue.take();
        statusMap.put(result, Status.AFTER_FIRST);
        return result;
    }

That way status management is always related to (and contained in) queue operations...

Obviously statusMap needs to be synchronized, but that would be an issue anyway.

like image 41
DavidW Avatar answered Oct 21 '22 08:10

DavidW