I need to process messages in parallel, but preserve the processing order of messages with the same conversation ID.
Example:
Let's define a Message like this:
class Message {
Message(long id, long conversationId, String someData) {...}
}
Suppose the messages arrive in the following order:
Message(1, 1, "a1"), Message(2, 2, "a2"), Message(3, 1, "b1"), Message(4, 2, "b2").
I need the message 3 to be processed after the message 1, since messages 1 and 3 have the same conversation ID (similarly, the message 4 should be processed after 2 by the same reason).
I don't care about the relative order between e.g. 1 and 2, since they have different conversation IDs.
I would like to reuse the java ThreadPoolExecutor's functionality as much as possible to avoid having to replace dead threads manually in my code etc.
Update: The number of possible 'conversation-ids' is not limited, and there is no time limit on a conversation. (I personally don't see it as a problem, since I can have a simple mapping from a conversationId to a worker number, e.g. conversationId % totalWorkers).
Update 2: There is one problem with a solution with multiple queues, where the queue number is determined by e.g. 'index = Objects.hash(conversationId) % total': if it takes a long time to process some message, all messages with the same 'index' but different 'conversationId' will wait even though other threads are available to handle it. That is, I believe solutions with a single smart blocking queue would be better, but it's just an opinion, I am open to any good solution.
Do you see an elegant solution for this problem?
A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task. There is only one thread pool per process.
What is ThreadPool in Java? A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing.
ExecutorService example with multiple threads and tasks In the earlier example, we created an ExecutorService that uses a single worker thread. But the real power of ExecutorService comes when we create a pool of threads and execute multiple tasks concurrently in the thread pool.
ThreadPool \ Process is for longer duration storage engine jobs, including aggregations, indexing, and commit operations. ROLAP storage mode also uses threads from the Processing thread pool. VertiPaq \ ThreadPool is the thread pool for executing table scans in a tabular model.
ThreadPool \ Parsing \ Short is a parsing pool for short requests. Requests that fit within a single network message are considered short. ThreadPool \ Parsing \ Long is a parsing pool for all other requests that do not fit within a single network message. A thread from either parsing pool can be used to execute a query.
ThreadPool \ Query is the thread pool that executes all requests that are not handled by the parsing thread pool. Threads in this thread pool will execute all types of operations, such as Discovers, MDX, DAX, DMX, and DDL commands. A ThreadPool \ IOProcess is used for IO jobs associated with storage engine queries in the multidimensional engine.
Parallel Processing in C# using Threads, ThreadPool and Tasks. Multi-threading is a concept to run multiple operations concurrently in your process to achieve maximum possible utilization of the CPU power. A thread defines an execution path. When the process starts, it starts a thread which is called Main Thread.
Not sure how you want messages to be processed. For convenience each message is of type Runnable, which is the place for execution to take place.
The solution to all of this is to have a number of Executor
's which are submit to a parallel ExecutorService
. Use the modulo operation to calculate to which Executor
the incoming message needs to be distributed to. Obviously, for the same conversation id its the same Executor
, hence you have parallel processing but sequential for the same conversation id. It's not guaranteed that messages with different conversation id's will always execute in parallel (all in all, you are bounded, at least, by the number of physical cores in your system).
public class MessageExecutor {
public interface Message extends Runnable {
long getId();
long getConversationId();
String getMessage();
}
private static class Executor implements Runnable {
private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
private volatile boolean stopped;
void schedule(Message message) {
messages.add(message);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Message message = messages.take();
message.run();
} catch (Exception e) {
System.err.println(e.getMessage());
}
}
}
}
private final Executor[] executors;
private final ExecutorService executorService;
public MessageExecutor(int poolCount) {
executorService = Executors.newFixedThreadPool(poolCount);
executors = new Executor[poolCount];
IntStream.range(0, poolCount).forEach(i -> {
Executor executor = new Executor();
executorService.submit(executor);
executors[i] = executor;
});
}
public void submit(Message message) {
final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
executors[executorNr].schedule(message);
}
public void stop() {
Arrays.stream(executors).forEach(Executor::stop);
executorService.shutdown();
}
}
You can then start the message executor with a pool ammount and submit messages to it.
public static void main(String[] args) {
MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 1;
}
@Override
public String getMessage() {
return "abc1";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.submit(new Message() {
@Override
public long getId() {
return 1;
}
@Override
public long getConversationId() {
return 2;
}
@Override
public String getMessage() {
return "abc2";
}
@Override
public void run() {
System.out.println(this.getMessage());
}
});
messageExecutor.stop();
}
When I run with a pool count of 2 and submit an amount of messages:
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
When the same amount of messages runs with a pool count of 3:
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Messages get distributed nicely among the pool of Executor
's :).
EDIT: the Executor
's run()
is catching all Exceptions, to ensure it does not break when one message is failing.
I had to do something very similar some time ago, so here is an adaptation.
(See it in action online)
It's actually the exact same base need, but in my case the key was a String, and more importantly the set of keys was not growing indefinitely, so here I had to add a "cleanup scheduler". Other than that it's basically the same code, so I hope I have not lost anything serious in the adaptation process. I tested it, looks like it works. It's longer than other solutions, though, perhaps more complex...
Base idea:
MessageTask
wraps a message into a Runnable
, and notifies queue when it is completeConvoQueue
: blocking queue of messages, for a conversation. Acts as a prequeue that guarantees desired order. See this trio in particular: ConvoQueue.runNextIfPossible()
→ MessageTask.run()
→ ConvoQueue.complete()
→ …MessageProcessor
has a Map<Long, ConvoQueue>
, and an ExecutorService
ConvoQueue
s feed the ExecutorService
and guarantee message order per convo, but not globally (so a "difficult" message will not block other conversations from being processed, unlike some other solutions, and that property was critically important in our case -- if it's not that critical for you, maybe a simpler solution is better)ScheduledExecutorService
(takes 1 thread)Visually:
ConvoQueues ExecutorService's internal queue
(shared, but has at most 1 MessageTask per convo)
Convo 1 ########
Convo 2 #####
Convo 3 ####### Thread 1
Convo 4 } → #### → {
Convo 5 ### Thread 2
Convo 6 #########
Convo 7 #####
(Convo 4 is about to be deleted)
Below all the classes (MessageProcessorTest
can be executed directly):
// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.SECONDS;
public class MessageProcessor {
private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;
public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}
public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}
private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}
private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}
}
// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
class ConvoQueue {
private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;
ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}
private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}
void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}
boolean isEmpty() {
return queue.isEmpty();
}
void addMessage(Message message) {
add(new MessageTask(this, message));
}
private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}
}
// MessageTask.java
public class MessageTask implements Runnable {
private ConvoQueue convoQueue;
private Message message;
MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}
@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}
private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}
}
// Message.java
class Message {
private long id;
private long conversationId;
private String data;
Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}
long getConversationId() {
return conversationId;
}
String getData() {
return data;
}
public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}
// MessageProcessorTest.java
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
}
}
Output (for each convo ID (2nd field) order is preserved):
Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}
Test above provided me confidence to share it, but I'm slightly worried that I might have forgotten details for pathological cases. It has been running in production for years without hitches (although with more code that allows to inspect it live when we need to see what's happening, why a certain queue takes time, etc -- never a problem with the system above in itself, but sometimes with the processing of a particular task)
Edit: click here to test online. Alternative: copy that gist in there, and press "Compile & Execute".
You essentially want the work to be done sequentially within a conversation. One solution would be to synchronize on a mutex that is unique to that conversation. The drawback of that solution is that if conversations are short lived and new conversations start on a frequent basis, the "mutexes" map will grow fast.
For brevity's sake I've omitted the executor shutdown, actual message processing, exception handling etc.
public class MessageProcessor {
private final ExecutorService executor;
private final ConcurrentMap<Long, Object> mutexes = new ConcurrentHashMap<> ();
public MessageProcessor(int threadCount) {
executor = Executors.newFixedThreadPool(threadCount);
}
public static void main(String[] args) throws InterruptedException {
MessageProcessor p = new MessageProcessor(10);
BlockingQueue<Message> queue = new ArrayBlockingQueue<> (1000);
//some other thread populates the queue
while (true) {
Message m = queue.take();
p.process(m);
}
}
public void process(Message m) {
Object mutex = mutexes.computeIfAbsent(m.getConversationId(), id -> new Object());
executor.submit(() -> {
synchronized(mutex) {
//That's where you actually process the message
}
});
}
}
I had a similar problem in my application. My first solution was sorting them using a java.util.ConcurrentHashMap. So in your case, this would be a ConcurrentHashMap with conversationId as key and a list of messages as value. The problem was that the HashMap got too big taking too much space.
My current solution is the following: One Thread receives the messages and stores them in a java.util.ArrayList. After receiving N messages it pushes the list to a second thread. This thread sorts the messages using the ArrayList.sort method using conversationId and id. Then the thread iterates through the sorted list and searches for blocks wich can be processed. Each block which can be processed is taken out of the list. To process a block you can create a runnable with this block and push this to an executor service. The messages which could not be processed remain in the list and will be checked in the next round.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With