Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring AMQP RabbitMQ implementing priority queue

After Google for a few days, and i believe i am totally lost. I would like to implement a kind of priority queue that has about 3 queues:

  1. high priority queue (daily), that needs to be process first.
  2. mid priority queue (weekly), that will process if no items in queue #1. (it is ok message in this queue it never process at all)
  3. low priority queue (monthly), that will process if no items in queue #1 & #2. (it is ok message in this queue it never process at all)

Initially I have the following flow, to have a consumer to consume messages from all three queues and checks whether there is any items in queue #1, #2 and #3. and then I realize that this is wrong because:

  1. I am totally lost with a question: "How do I know which queue it is coming from?".
  2. I'm already consuming a message regardless from any queue, So if I get an object from lower priority queue, am I gonna put it back to the queue if I discover there is a message at the higher priority queue?

Following is my current configurations, which shows what an idiot I am.

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

<rabbit:connection-factory id="connectionFactory" host="localhost" />

<rabbit:template id="amqpTemplatead_daily" connection-factory="connectionFactory"
    exchange="" routing-key="daily_queue"/>

<rabbit:template id="amqpTemplatead_weekly" connection-factory="connectionFactory"
    exchange="" routing-key="weekly_queue"/>

<rabbit:template id="amqpTemplatead_monthly" connection-factory="connectionFactory"
    exchange="" routing-key="monthly_queue"/>

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="daily_queue" />
</rabbit:listener-container>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="weekly_queue" />
</rabbit:listener-container>    

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="Consumer" method="consume" queue-names="monthly_queue" />
</rabbit:listener-container>    

<bean id="Consumer" class="com.test.Consumer" />

</beans>

Any idea how should I tackle this with priority queue?

ps: I also wonder, if Apache Camel has something I can depend on?

UPDATE 1: I just saw this from Apache Camel: "https://issues.apache.org/jira/browse/CAMEL-2537" the sequencer on JMSPriority seems to be what im looking for, anyone has tried this before?

UPDATE 2: assuming i am to use RabbitMQ's plugin base on @Gary Russell recommendation, I have the following spring-rabbitmq context XML configuration, which seems to make sense (by guest..):

<rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
            <entry key="x-max-priority" value="10"/>
    </rabbit:queue-arguments>
</rabbit:queue>

<rabbit:listener-container connection-factory="connectionFactory">
    <rabbit:listener ref="adGoogleDfaReporting" method="consume" queue-names="ad_google_dfa_reporting_queue" />
</rabbit:listener-container>

<bean id="Consumer" class="com.test.Consumer" />

The above xml configuration has successfully create a Queue, with name: "ad_google_dfa_reporting_queue", and with Parameter arguments: x-max-priority: 10 & durable: true

But not when comes to the code that send the message with priority, I totally lost it. How to define the priority as mention in the Sample URL: https://github.com/rabbitmq/rabbitmq-priority-queue/blob/master/examples/java/src/com/rabbitmq/examples/PriorityQueue.java

AmqpTemplate amqpTemplateGoogleDfaReporting = (AmqpTemplate) applicationContext.getBean("amqpTemplateadGoogleDfaReporting");
amqpTemplateGoogleDfaReporting.convertAndSend("message"); // how to define message priority?

UPDATE 3: Based on the @Gary's answer, i manage to sent message with priority set in the message, as per image below: message priority screenshot However, when i sent in 1000 messages with random priority between 1-10, the consumer is consuming message with all kinds of priority. (I was expecting only the high priority message to be consume first). following is the code for Message producer:

    Random random = new Random();
    for (int i=0; i< 1000; i++){
        final int priority = random.nextInt(10 - 1 + 1) + 1;

        DfaReportingModel model = new DfaReportingModel();
        model.setReportType(DfaReportingModel.ReportType.FACT);
        model.setUserProfileId(0l + priority);
        amqpTemplateGoogleDfaReporting.convertAndSend(model, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setPriority(priority);
                return message;
            }
        });
    }

And following is the code for Message consumer:

    public void consume(DfaReportingModel message) {
        System.out.println(message.getUserProfileId());

        Thread.sleep(500);
    }

The result im getting:

9, 10, 7, 9, 6, 4, 10, 10, 3, 10, 6, 1, 5, 6, 6, 3, 4, 7, 6, 8, 3, 1, 4, 5, 5, 3, 10, 9, 5, 1, 8, 9, 6, 9, 3, 10, 7, 4, 8, 7, 3, 4, 8, 2, 6, 9, 6, 4, 7, 7, 2, 8, 4, 4, 1,

UPDATE 4: Problem solved! Knowing the sample code from https://github.com/rabbitmq/rabbitmq-priority-queue is working in my environment, I presume that the problem is around the spring context. Hence, after countless time on try and error with different type of configurations, and I pin point the exact combination that will make this works! and is as per following:

    <rabbit:queue name="ad_google_dfa_reporting_queue">
    <rabbit:queue-arguments>
        <entry key="x-max-priority">
            <value type="java.lang.Integer">10</value> <!-- MUST specifically define java.lang.Integer to get it to work -->
        </entry>
    </rabbit:queue-arguments>
</rabbit:queue>

Without specifically define the value is Integer type, the priority queue does not work. Finally, it is solved. Yay!

like image 685
Reusable Avatar asked Nov 10 '14 07:11

Reusable


2 Answers

RabbitMQ now has a priority queue plugin where messages are delivered in priority order. It would be better to use that rather than your scheme of requeueing low priority messages which will be quite expensive at runtime.

EDIT:

When using the rabbitTemplate.convertAndSend(...) methods, and you want to set the priority property on the message, you either need to implement a custom MessagePropertiesConverter in the template (subclass the DefaultMessagePropertiesConverter) or use the convertAnSend variants that take a message post-processor; e.g.:

template.convertAndSend("exchange", "routingKey", "message", new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties().setPriority(5);
        return message;
    }
});
like image 68
Gary Russell Avatar answered Oct 21 '22 04:10

Gary Russell


RabbitMQ has priority queue implementation in the core as of version 3.5.0.

You can declare priority queues using the x-max-priority argument. This argument should be an integer indicating the maximum priority the queue should support. For example, using the Java client:

Channel ch = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
ch.queueDeclare("my-priority-queue", true, false, false, args);

You can then publish prioritised messages using the priority field of basic.properties. Larger numbers indicate higher priority.

like image 26
Rahul Avatar answered Oct 21 '22 05:10

Rahul