After Google for a few days, and i believe i am totally lost. I would like to implement a kind of priority queue that has about 3 queues:
Initially I have the following flow, to have a consumer to consume messages from all three queues and checks whether there is any items in queue #1, #2 and #3. and then I realize that this is wrong because:
Following is my current configurations, which shows what an idiot I am.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" />
<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
exchange="" routing-key="daily_queue"/>
<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
exchange="" routing-key="weekly_queue"/>
<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
exchange="" routing-key="monthly_queue"/>
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
</beans>
Any idea how should I tackle this with priority queue?
ps: I also wonder, if Apache Camel has something I can depend on?
UPDATE 1: I just saw this from Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" the sequencer on JMSPriority seems to be what im looking for, anyone has tried this before?
UPDATE 2: assuming i am to use RabbitMQ's plugin base on @Gary Russell recommendation, I have the following spring-rabbitmq context XML configuration, which seems to make sense (by guest..):
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority" value="10"/>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>
<bean id="Consumer" class="com.test.Consumer" />
The above xml configuration has successfully create a Queue, with name: "ad_google_dfa_reporting_queue", and with Parameter arguments: x-max-priority: 10 & durable: true
But not when comes to the code that send the message with priority, I totally lost it. How to define the priority as mention in the Sample URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java
AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?
UPDATE 3: Based on the @Gary's answer, i manage to sent message with priority set in the message, as per image below: However, when i sent in 1000 messages with random priority between 1-10, the consumer is consuming message with all kinds of priority. (I was expecting only the high priority message to be consume first). following is the code for Message producer:
Random random = new Random();
for (int i=0; i< 1000; i++){
final int priority = random.nextInt(10 - 1 + 1) + 1;
DfaReportingModel model = new DfaReportingModel();
model.setReportType(DfaReportingModel.ReportType.FACT);
model.setUserProfileId(0l + priority);
amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(priority);
return message;
}
});
}
And following is the code for Message consumer:
public void consume(DfaReportingModel message) {
System.out.println(message.getUserProfileId());
Thread.sleep(500);
}
The result im getting:
9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,
UPDATE 4: Problem solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue is working in my environment, I presume that the problem is around the spring context. Hence, after countless time on try and error with different type of configurations, and I pin point the exact combination that will make this works! and is as per following:
<rabbit:queue name="ad_google_dfa_reporting_queue">
<rabbit:queue-arguments>
<entry key="x-max-priority">
<value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
</entry>
</rabbit:queue-arguments>
</rabbit:queue>
Without specifically define the value is Integer type, the priority queue does not work. Finally, it is solved. Yay!
RabbitMQ now has a priority queue plugin where messages are delivered in priority order. It would be better to use that rather than your scheme of requeueing low priority messages which will be quite expensive at runtime.
EDIT:
When using the rabbitTemplate.convertAndSend(...)
methods, and you want to set the priority property on the message, you either need to implement a custom MessagePropertiesConverter
in the template (subclass the DefaultMessagePropertiesConverter
) or use the convertAnSend
variants that take a message post-processor; e.g.:
template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setPriority(5);
return message;
}
});
RabbitMQ has priority queue implementation in the core as of version 3.5.0.
You can declare priority queues using the x-max-priority
argument. This argument should be an integer indicating the maximum priority the queue should support. For example, using the Java client:
Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);
You can then publish prioritised messages using the priority field of basic.properties
. Larger numbers indicate higher priority.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With