Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multithreaded execution where order of finished Work Items is preserved

I have a flow of units of work, lets call them "Work Items" that are processed sequentially (for now). I'd like to speed up processing by doing the work multithreaded.

Constraint: Those work items come in a specific order, during processing the order is not relevant - but once processing is finished the order must be restored.

Something like this:

   |.|
   |.|
   |4|
   |3|
   |2|    <- incoming queue
   |1|
  / | \
 2  1  3  <- worker threads
  \ | /
   |3|
   |2|    <- outgoing queue
   |1|

I would like to solve this problem in Java, preferably without Executor Services, Futures, etc., but with basic concurrency methods like wait(), notify(), etc.

Reason is: My Work Items are very small and fine grained, they finish processing in about 0.2 milliseconds each. So I fear using stuff from java.util.concurrent.* might introduce way to much overhead and slow my code down.

The examples I found so far all preserve the order during processing (which is irrelevant in my case) and didn't care about order after processing (which is crucial in my case).

like image 849
Frizz Avatar asked Apr 05 '16 17:04

Frizz


People also ask

What is multi threaded execution?

Multithreading is a model of program execution that allows for multiple threads to be created within a process, executing independently but concurrently sharing process resources. Depending on the hardware, threads can run fully parallel if they are distributed to their own CPU core.

What happens when a thread finishes execution?

Finishing Threads So when does a thread finish? It happens in one of two cases: all instructions in the Runnable are executed. an uncaught exception is thrown from the run method.

How do you order a thread execution?

In this tutorial I will discuss about the join() method from Thread class. This method is important method from Thread class and it imposes order on execution of the threads. Therefore join() method ensures that multiple threads run in sequence, i.e., in the same order the threads were started.

What is the result of setting multithread execution?

In a multithreaded process on a single processor, the processor can switch execution resources between threads, resulting in concurrent execution. Concurrency indicates that more than one thread is making progress, but the threads are not actually running simultaneously.


2 Answers

This is how I solved your problem in a previous project (but with java.util.concurrent):

(1) WorkItem class does the actual work/processing:

public class WorkItem implements Callable<WorkItem> {
    Object content;
    public WorkItem(Object content) {
        super();
        this.content = content;
    }

    public WorkItem call() throws Exception {
        // getContent() + do your processing
        return this;
    }
}

(2) This class puts Work Items in a queue and initiates processing:

public class Producer {
    ...
    public Producer() {
        super();
        workerQueue = new ArrayBlockingQueue<Future<WorkItem>>(THREADS_TO_USE);
        completionService = new ExecutorCompletionService<WorkItem>(Executors.newFixedThreadPool(THREADS_TO_USE));
        workerThread = new Thread(new Worker(workerQueue));
        workerThread.start();
    }

    public void send(Object o) throws Exception {
        WorkItem workItem = new WorkItem(o);
        Future<WorkItem> future = completionService.submit(workItem);
        workerQueue.put(future);
    }
}

(3) Once processing is finished the Work Items are dequeued here:

public class Worker implements Runnable {
    private ArrayBlockingQueue<Future<WorkItem>> workerQueue = null;

    public Worker(ArrayBlockingQueue<Future<WorkItem>> workerQueue) {
        super();
        this.workerQueue = workerQueue;
    }

    public void run() {
        while (true) {
            Future<WorkItem> fwi = workerQueue.take(); // deqeueue it
            fwi.get(); // wait for it till it has finished processing
        }
    }
}

(4) This is how you would use the stuff in your code and submit new work:

public class MainApp {
    public static void main(String[] args) throws Exception {
        Producer p = new Producer();
        for (int i = 0; i < 10000; i++)
            p.send(i);
    }
}
like image 146
Rollergirl Avatar answered Sep 28 '22 10:09

Rollergirl


If you allow BlockingQueue, why would you ignore the rest of the concurrency utils in java? You could use e.g. Stream (if you have java 1.8) for the above:

List<Type> data = ...;
List<Other> out = data.parallelStream()
    .map(t -> doSomeWork(t))
    .collect(Collectors.toList());

Because you started from an ordered Collection (List), and collect also to a List, you will have results in the same order as the input.

like image 29
Krzysztof Krasoń Avatar answered Sep 28 '22 11:09

Krzysztof Krasoń