Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread pool to process messages in parallel, but preserve order within conversations

I need to process messages in parallel, but preserve the processing order of messages with the same conversation ID.

Example:
Let's define a Message like this:

class Message {
    Message(long id, long conversationId, String someData) {...}
}

Suppose the messages arrive in the following order:
Message(1, 1, "a1"), Message(2, 2, "a2"), Message(3, 1, "b1"), Message(4, 2, "b2").

I need the message 3 to be processed after the message 1, since messages 1 and 3 have the same conversation ID (similarly, the message 4 should be processed after 2 by the same reason).
I don't care about the relative order between e.g. 1 and 2, since they have different conversation IDs.

I would like to reuse the java ThreadPoolExecutor's functionality as much as possible to avoid having to replace dead threads manually in my code etc.

Update: The number of possible 'conversation-ids' is not limited, and there is no time limit on a conversation. (I personally don't see it as a problem, since I can have a simple mapping from a conversationId to a worker number, e.g. conversationId % totalWorkers).

Update 2: There is one problem with a solution with multiple queues, where the queue number is determined by e.g. 'index = Objects.hash(conversationId) % total': if it takes a long time to process some message, all messages with the same 'index' but different 'conversationId' will wait even though other threads are available to handle it. That is, I believe solutions with a single smart blocking queue would be better, but it's just an opinion, I am open to any good solution.

Do you see an elegant solution for this problem?

like image 441
Alexander Avatar asked Jun 02 '17 11:06

Alexander


People also ask

What is a pool in multithreading?

A thread pool manages a set of anonymous threads that perform work on request. The threads do not terminate right away. When one of the threads completes a task, the thread becomes idle, ready to be dispatched to another task.

When using thread pool what happens?

Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task. There is only one thread pool per process.

What is a thread pool in Java What purpose does it serve?

What is ThreadPool in Java? A thread pool reuses previously created threads to execute current tasks and offers a solution to the problem of thread cycle overhead and resource thrashing.

What is ExecutorService and thread pool?

ExecutorService example with multiple threads and tasks In the earlier example, we created an ExecutorService that uses a single worker thread. But the real power of ExecutorService comes when we create a pool of threads and execute multiple tasks concurrently in the thread pool.

What is the difference between \ThreadPool \ process and \ ThreadPool \ Vertipaq?

ThreadPool \ Process is for longer duration storage engine jobs, including aggregations, indexing, and commit operations. ROLAP storage mode also uses threads from the Processing thread pool. VertiPaq \ ThreadPool is the thread pool for executing table scans in a tabular model.

What is the difference between ThreadPool \ parsing \ short and long?

ThreadPool \ Parsing \ Short is a parsing pool for short requests. Requests that fit within a single network message are considered short. ThreadPool \ Parsing \ Long is a parsing pool for all other requests that do not fit within a single network message. A thread from either parsing pool can be used to execute a query.

What is the difference between a ThreadPool \ query and ioprocess?

ThreadPool \ Query is the thread pool that executes all requests that are not handled by the parsing thread pool. Threads in this thread pool will execute all types of operations, such as Discovers, MDX, DAX, DMX, and DDL commands. A ThreadPool \ IOProcess is used for IO jobs associated with storage engine queries in the multidimensional engine.

How do you use parallel processing in C?

Parallel Processing in C# using Threads, ThreadPool and Tasks. Multi-threading is a concept to run multiple operations concurrently in your process to achieve maximum possible utilization of the CPU power. A thread defines an execution path. When the process starts, it starts a thread which is called Main Thread.


4 Answers

Not sure how you want messages to be processed. For convenience each message is of type Runnable, which is the place for execution to take place.

The solution to all of this is to have a number of Executor's which are submit to a parallel ExecutorService. Use the modulo operation to calculate to which Executor the incoming message needs to be distributed to. Obviously, for the same conversation id its the same Executor, hence you have parallel processing but sequential for the same conversation id. It's not guaranteed that messages with different conversation id's will always execute in parallel (all in all, you are bounded, at least, by the number of physical cores in your system).

public class MessageExecutor {

    public interface Message extends Runnable {

        long getId();

        long getConversationId();

        String getMessage();

    }

    private static class Executor implements Runnable {

        private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Message message) {
            messages.add(message);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Message message = messages.take();
                    message.run();
                } catch (Exception e) {
                    System.err.println(e.getMessage());
                }
            }
        }
    }

    private final Executor[] executors;
    private final ExecutorService executorService;

    public MessageExecutor(int poolCount) {
        executorService = Executors.newFixedThreadPool(poolCount);
        executors = new Executor[poolCount];

        IntStream.range(0, poolCount).forEach(i -> {
            Executor executor = new Executor();
            executorService.submit(executor);
            executors[i] = executor;
        });
    }

    public void submit(Message message) {
        final int executorNr = Objects.hash(message.getConversationId()) % executors.length;
        executors[executorNr].schedule(message);
    }

    public void stop() {
        Arrays.stream(executors).forEach(Executor::stop);
        executorService.shutdown();
    }
}

You can then start the message executor with a pool ammount and submit messages to it.

public static void main(String[] args) {
    MessageExecutor messageExecutor = new MessageExecutor(Runtime.getRuntime().availableProcessors());
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 1;
        }

        @Override
        public String getMessage() {
            return "abc1";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.submit(new Message() {
        @Override
        public long getId() {
            return 1;
        }

        @Override
        public long getConversationId() {
            return 2;
        }

        @Override
        public String getMessage() {
            return "abc2";
        }

        @Override
        public void run() {
            System.out.println(this.getMessage());
        }
    });
    messageExecutor.stop();
}

When I run with a pool count of 2 and submit an amount of messages:

Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [22] is scheduled on scheduler #[1]
Message with conversation id [1] is scheduled on scheduler #[0]
Message with conversation id [2] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[0]
Message with conversation id [4] is scheduled on scheduler #[1]

When the same amount of messages runs with a pool count of 3:

Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [22] is scheduled on scheduler #[2]
Message with conversation id [1] is scheduled on scheduler #[2]
Message with conversation id [2] is scheduled on scheduler #[0]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [3] is scheduled on scheduler #[1]
Message with conversation id [4] is scheduled on scheduler #[2]

Messages get distributed nicely among the pool of Executor's :).

EDIT: the Executor's run() is catching all Exceptions, to ensure it does not break when one message is failing.

like image 30
Velth Avatar answered Oct 11 '22 00:10

Velth


I had to do something very similar some time ago, so here is an adaptation.

(See it in action online)

It's actually the exact same base need, but in my case the key was a String, and more importantly the set of keys was not growing indefinitely, so here I had to add a "cleanup scheduler". Other than that it's basically the same code, so I hope I have not lost anything serious in the adaptation process. I tested it, looks like it works. It's longer than other solutions, though, perhaps more complex...

Base idea:

  • MessageTask wraps a message into a Runnable, and notifies queue when it is complete
  • ConvoQueue: blocking queue of messages, for a conversation. Acts as a prequeue that guarantees desired order. See this trio in particular: ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete() → …
  • MessageProcessor has a Map<Long, ConvoQueue>, and an ExecutorService
  • messages are processed by any thread in the executor, the ConvoQueues feed the ExecutorService and guarantee message order per convo, but not globally (so a "difficult" message will not block other conversations from being processed, unlike some other solutions, and that property was critically important in our case -- if it's not that critical for you, maybe a simpler solution is better)
  • cleanup with ScheduledExecutorService (takes 1 thread)

Visually:

   ConvoQueues              ExecutorService's internal queue
                            (shared, but has at most 1 MessageTask per convo)
Convo 1   ########   
Convo 2      #####   
Convo 3    #######                        Thread 1
Convo 4              } →    ####    → {
Convo 5        ###                        Thread 2
Convo 6  #########   
Convo 7      #####   

(Convo 4 is about to be deleted)

Below all the classes (MessageProcessorTest can be executed directly):

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

    private static final long CLEANUP_PERIOD_S = 10;
    private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
    private final ExecutorService executorService;

    public MessageProcessor(int nbThreads) {
        executorService = Executors.newFixedThreadPool(nbThreads);
        ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
        cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
    }

    public void addMessageToProcess(Message message) {
        ConvoQueue queue = getQueue(message.getConversationId());
        queue.addMessage(message);
    }

    private ConvoQueue getQueue(Long convoId) {
        synchronized (queuesByConvo) {
            return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
        }
    }

    private void removeEmptyQueues() {
        synchronized (queuesByConvo) {
            queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
        }
    }

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

    private Queue<MessageTask> queue;
    private MessageTask activeTask;
    private ExecutorService executorService;

    ConvoQueue(ExecutorService executorService) {
        this.executorService = executorService;
        this.queue = new LinkedBlockingQueue<>();
    }

    private void runNextIfPossible() {
        synchronized(this) {
            if (activeTask == null) {
                activeTask = queue.poll();
                if (activeTask != null) {
                    executorService.submit(activeTask);
                }
            }
        }
    }

    void complete(MessageTask task) {
        synchronized(this) {
            if (task == activeTask) {
                activeTask = null;
                runNextIfPossible();
            }
            else {
                throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
            }
        }
    }

    boolean isEmpty() {
        return queue.isEmpty();
    }

    void addMessage(Message message) {
        add(new MessageTask(this, message));
    }

    private void add(MessageTask task) {
        synchronized(this) {
            queue.add(task);
            runNextIfPossible();
        }
    }

}

// MessageTask.java
public class MessageTask implements Runnable {

    private ConvoQueue convoQueue;
    private Message message;

    MessageTask(ConvoQueue convoQueue, Message message) {
        this.convoQueue = convoQueue;
        this.message = message;
    }

    @Override
    public void run() {
        try {
            processMessage();
        }
        finally {
            convoQueue.complete(this);
        }
    }

    private void processMessage() {
        // Dummy processing with random delay to observe reordered messages & preserved convo order
        try {
            Thread.sleep((long) (50*Math.random()));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(message);
    }

}

// Message.java
class Message {

    private long id;
    private long conversationId;
    private String data;

    Message(long id, long conversationId, String someData) {
        this.id = id;
        this.conversationId = conversationId;
        this.data = someData;
    }

    long getConversationId() {
        return conversationId;
    }

    String getData() {
        return data;
    }

    public String toString() {
        return "Message{" + id + "," + conversationId + "," + data + "}";
    }
}

// MessageProcessorTest.java
public class MessageProcessorTest {
    public static void main(String[] args) {
        MessageProcessor test = new MessageProcessor(2);
        for (int i=1; i<100; i++) {
            test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
        }
    }
}

Output (for each convo ID (2nd field) order is preserved):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}

Test above provided me confidence to share it, but I'm slightly worried that I might have forgotten details for pathological cases. It has been running in production for years without hitches (although with more code that allows to inspect it live when we need to see what's happening, why a certain queue takes time, etc -- never a problem with the system above in itself, but sometimes with the processing of a particular task)

Edit: click here to test online. Alternative: copy that gist in there, and press "Compile & Execute".

like image 101
Hugues M. Avatar answered Oct 11 '22 00:10

Hugues M.


You essentially want the work to be done sequentially within a conversation. One solution would be to synchronize on a mutex that is unique to that conversation. The drawback of that solution is that if conversations are short lived and new conversations start on a frequent basis, the "mutexes" map will grow fast.

For brevity's sake I've omitted the executor shutdown, actual message processing, exception handling etc.

public class MessageProcessor {

  private final ExecutorService executor;
  private final ConcurrentMap<Long, Object> mutexes = new ConcurrentHashMap<> ();

  public MessageProcessor(int threadCount) {
    executor = Executors.newFixedThreadPool(threadCount);
  }

  public static void main(String[] args) throws InterruptedException {
    MessageProcessor p = new MessageProcessor(10);
    BlockingQueue<Message> queue = new ArrayBlockingQueue<> (1000);

    //some other thread populates the queue

    while (true) {
      Message m = queue.take();
      p.process(m);
    }
  }

  public void process(Message m) {
    Object mutex = mutexes.computeIfAbsent(m.getConversationId(), id -> new Object());
    executor.submit(() -> {
      synchronized(mutex) {
        //That's where you actually process the message
      }
    });
  }
}
like image 3
assylias Avatar answered Oct 10 '22 22:10

assylias


I had a similar problem in my application. My first solution was sorting them using a java.util.ConcurrentHashMap. So in your case, this would be a ConcurrentHashMap with conversationId as key and a list of messages as value. The problem was that the HashMap got too big taking too much space.

My current solution is the following: One Thread receives the messages and stores them in a java.util.ArrayList. After receiving N messages it pushes the list to a second thread. This thread sorts the messages using the ArrayList.sort method using conversationId and id. Then the thread iterates through the sorted list and searches for blocks wich can be processed. Each block which can be processed is taken out of the list. To process a block you can create a runnable with this block and push this to an executor service. The messages which could not be processed remain in the list and will be checked in the next round.

like image 3
Thomas Krieger Avatar answered Oct 10 '22 22:10

Thomas Krieger