I have multiple task producers that add work to a queue. I also have multiple consumers that feed off that queue. Since these queues are FIFO, they are dequeued in the same order they were added.
In my scenario, tasks are added to the queue from HTTP requests. Each task is associated with an account and there is no rate-limiting. Therefore it is possible to have tasks from one account flood the message queue.
In order to solve this, I've been looking for a queue implementation which allows me process enqueued tasks from multiple accounts in round-robin fashion for fairness.
I've currently resorted to using Redis with some Lua scripts thrown in to emulate a round robin queue but was wondering if there are any existing queueing topologies that accomplish this?
I would like to propose Guava Multimap usage:
import java.util.LinkedHashSet;
public class QueueTest {
public static void main(String[] args) {
TreeMultimap<String, String> multimap = TreeMultimap.create();
multimap.put("c1", "TaskC11");
multimap.put("c1", "TaskC12");
multimap.put("c1", "TaskC13");
multimap.put("c2", "TaskC21");
multimap.put("c3", "TaskC31");
while (multimap.size() > 0) {
for (String customer : new LinkedHashSet<>(multimap.keySet())) {
String taskToProcess = multimap.get(customer).pollFirst();
System.out.println(taskToProcess);
}
}
}
}
Result:
TaskC11
TaskC21
TaskC31
TaskC12
TaskC13
Also you can add custom comparators for priority management for each customer.
I usually do it like this:
Instead of putting tasks directly into the work queue, make a separate task queue for each account. Each request puts a task into its account queue, and when the account queue goes from empty to non-empty, put the account queue into the global work queue
Workers take account queues from the work queue when they are ready for more work. When a worker takes an account queue, it takes out the first task and the worker immediately puts the account queue back at the end of the work queue if it's not empty. Then the worker performs the task.
Using this system, each account queue is in the work queue at most once, and all accounts with associated work are equally represented in the work queue.
This is pretty easy to implement, but you do have to be careful about detecting when you have to put an account queue into the work queue, since there can be two threads making this decision at the same time, and you don't want the account queue to go in twice.
I make it simple like this:
Have an atomic Boolean in each account queue that keeps track of whether or not it's in the work queue. The worker sets this to false immediately after dequeing the account queue. If anyone finds the account queue non-empty, they can try to CAS this boolean to true, and if successful put the account queue into the work queue.
There's a small chance that an account queue could get into the work queue when it's empty. Make sure this is harmless -- if a worker fails to take a task from an account queue, it should just forget about it and take a new account queue from the work queue.
With a RabbitMQ Direct Exchange and Spring AMQP you can implement a queuing topology that holds a queue for each account connected to a single exchange. Sending messages to the exchange with the account name as a routing key and having a single consumer bound to multiple queues, the consumer will receive the messages round robin (see "Direct exchanges and load balance").
The problem with this setup is, that you might end up with quite a few queues (one for each account) and at least in my implementation (attached as simple Spring Boot application below), you will have to "restart" the consumer each time a new account comes in, as this means that you have a new queue to attach the consumer to. Don't know, whether this scales / performs very well. Check this post for the maximum number of queues in RabbitMQ and if this might affect you.
@RunWith(SpringRunner.class)
@SpringBootTest(classes = RoundRobin.RoundRobinQueueConfiguration.class)
public class RoundRobin {
private static final String EXCHANGE = "round-robin-exchange";
private final List<String> tasks = Arrays.asList( // account(a):task(t) where t holds the expected order of consumption
"a1:t1", "a2:t2", "a3:t3", // make sure, a queue for every account (a) exists
"a1:t4", "a1:t7", "a1:t9", "a1:t10", // add "many" tasks (t) for account 1
"a2:t5", "a2:t8", "a3:t6"); // add further tasks for other accounts, such that a1 has to "wait"
private final List<String> declaredQueues = new ArrayList<>();
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
private DirectExchange directExchange;
@Autowired
private SimpleMessageListenerContainer listenerContainer;
@Test
public void enqueuedTasksAreProcessedRoundRobin() {
tasks.forEach(task -> {
String[] accountAndTask = task.split(":");
declareQueue(accountAndTask[0]);
rabbitTemplate.convertAndSend(accountAndTask[0], accountAndTask[1] + " from account " + accountAndTask[0]);
});
}
private void declareQueue(String routingKey) {
if (!declaredQueues.contains(routingKey)) {
Queue queue = new Queue(routingKey);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(routingKey));
listenerContainer.stop();
listenerContainer.addQueues(queue);
listenerContainer.start();
declaredQueues.add(routingKey);
}
}
@Configuration
public static class RoundRobinQueueConfiguration {
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setExchange(EXCHANGE);
return template;
}
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectExchange directExchange(RabbitAdmin rabbitAdmin) {
DirectExchange directExchange = new DirectExchange(EXCHANGE);
rabbitAdmin.declareExchange(directExchange);
return directExchange;
}
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) {
Queue queue = new Queue("dummy-queue"); // we need a queue to get the container started...
rabbitAdmin.declareQueue(queue);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setMessageListener(new RoundRobinMessageListener());
container.setQueues(new Queue("dummy-queue"));
container.start();
return container;
}
}
public static class RoundRobinMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
System.out.println("Consumed message " + (new String(message.getBody())));
}
}
}
The number of the tasks is arbitrary in this example - but I wanted to "add the expected" order to see whether the output matches our expectations.
The output of the test is:
Consumed message t1 from account a1
Consumed message t2 from account a2
Consumed message t3 from account a3
Consumed message t4 from account a1
Consumed message t5 from account a2
Consumed message t6 from account a3
Consumed message t7 from account a1
Consumed message t8 from account a2
Consumed message t9 from account a1
Consumed message t10 from account a1
which I guess is what you wanted...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With