Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Multiple DefaultMessageListenerContainer feeding a single TaskExecutor/ThreadPool (fairly)

I'm working on a spring boot application in which I intend to consume JMS messages from multiple queues - but I'd like to control the overall number of executing threads processing these messages as opposed to controlling the number of threads executing per JMS queue.

In short: Lots of JMS Queues. A single thread pool to do the processing.

Some of the queues will be busier than others (some will probably always have work, others will be idle for long periods) - so I'd like to use the available processing power to do whatever work needs to be done regardless of the source queue.

I've set up a series of DefaultMessageListenerContainers each using a shared TaskExecutor with a fixed pool size. The behavior I observe, though is that one queue will consume all of the available slots - then (even when that first queue becomes empty) the slots don't become available for the other queues' DefaultMessageListenerContainers to use.

This is spelled out in the javadocs for DefaultMessageListenerContainer.setTaskExecutor():

A plain thread pool does not add much value, as this listener container will occupy a number of threads for its entire lifetime.

  • Is there a way around this? How does it work differently in a "J2EE environment" as mentioned in the javadocs?
  • Can I do something like have a MessageListenerContainer which can consume from many queues, for example?
  • If what I want is possible - is it then possible to apply some ordering whereby a message received on a previously idle queue can be given a higher priority?
like image 337
Harry Lime Avatar asked Sep 12 '14 21:09

Harry Lime


1 Answers

I tried to re-create your problem but I can't. May be because of the lack of source code you have shown in this question. But this is what I tried.

import java.io.File;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.jms.listener.adapter.MessageListenerAdapter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.FileSystemUtils;

@Configuration
@EnableAutoConfiguration
public class Application {

    public class Receiver {

        private String name;

        public Receiver(String name) {
            this.name = name;
        }

        /**
         * When you receive a message, print it out, then shut down the application.
         * Finally, clean up any ActiveMQ server stuff.
         */
        public void receiveMessage(String message) {
            System.out.println("Received <" + message + "> @ "+name);
            //context.close();
            FileSystemUtils.deleteRecursively(new File("activemq-data"));
        }
    }

    static String mailboxDestination = "mailbox-destination";

    static String mailboxDestination2= "mailbox-destination2";

    @Bean
    MessageListenerAdapter adapter1() {
        MessageListenerAdapter messageListener
                = new MessageListenerAdapter(new Receiver("MailBox1"));
        messageListener.setDefaultListenerMethod("receiveMessage");
        return messageListener;
    }

    @Bean
    MessageListenerAdapter adapter2() {
        MessageListenerAdapter messageListener
                = new MessageListenerAdapter(new Receiver("MailBox2"));
        messageListener.setDefaultListenerMethod("receiveMessage");
        return messageListener;
    }

    @Bean
    DefaultMessageListenerContainer container(ConnectionFactory connectionFactory) throws Exception {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setMessageListener(adapter1());
        container.setConnectionFactory(connectionFactory);
        container.setDestinationName(mailboxDestination);
        container.setTaskExecutor(taskExecutor());

        return container;
    }

    @Bean
    DefaultMessageListenerContainer container2(ConnectionFactory connectionFactory) throws Exception {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setMessageListener(adapter2());
        container.setConnectionFactory(connectionFactory);
        container.setDestinationName(mailboxDestination2);
        container.setTaskExecutor(taskExecutor());

        return container;
    }

    @Bean
    TaskExecutor taskExecutor() throws Exception {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(10);
        taskExecutor.setMaxPoolSize(100);
        taskExecutor.setQueueCapacity(1000);
        taskExecutor.setThreadGroupName("MyThreads");

        return taskExecutor;
    }

    public static void main(String[] args) {
        // Clean out any ActiveMQ data from a previous run
        FileSystemUtils.deleteRecursively(new File("activemq-data"));

        // Launch the application
        ConfigurableApplicationContext context = SpringApplication.run(Application.class, args);

        // Send a message

        final JmsTemplate jmsTemplate = context.getBean(JmsTemplate.class);
        System.out.println("Sending a new message.");
        new Thread() {
            public void run() {
                for(int i=0;i<100;i++) {
                    final String message = (i+1)+" ping!";
                    MessageCreator messageCreator = new MessageCreator() {
                        @Override
                        public Message createMessage(Session session) throws JMSException {
                            return session.createTextMessage(message);
                        }
                    };
                    jmsTemplate.send(mailboxDestination, messageCreator);
                }
            }
        }.start();

        new Thread() {
            public void run() {
                for(int i=0;i<100;i++) {
                    final String message = (i+1)+" ping!";
                    MessageCreator messageCreator = new MessageCreator() {
                        @Override
                        public Message createMessage(Session session) throws JMSException {
                            return session.createTextMessage(message);
                        }
                    };
                    jmsTemplate.send(mailboxDestination2, messageCreator);
                }
            }
        }.start();

    }

}

And the output I am getting is;

Received <1 ping!> @ MailBox2
Received <1 ping!> @ MailBox1
Received <2 ping!> @ MailBox2
Received <2 ping!> @ MailBox1
Received <3 ping!> @ MailBox2
Received <3 ping!> @ MailBox1
Received <4 ping!> @ MailBox2
Received <4 ping!> @ MailBox1
Received <5 ping!> @ MailBox2
Received <5 ping!> @ MailBox1
Received <6 ping!> @ MailBox2
Received <6 ping!> @ MailBox1
Received <7 ping!> @ MailBox2
Received <7 ping!> @ MailBox1
Received <8 ping!> @ MailBox2
Received <8 ping!> @ MailBox1
Received <9 ping!> @ MailBox2
Received <9 ping!> @ MailBox1
Received <10 ping!> @ MailBox2
Received <10 ping!> @ MailBox1
Received <11 ping!> @ MailBox2
Received <11 ping!> @ MailBox1
Received <12 ping!> @ MailBox1
Received <12 ping!> @ MailBox2
Received <13 ping!> @ MailBox1
Received <13 ping!> @ MailBox2
Received <14 ping!> @ MailBox1
Received <14 ping!> @ MailBox2
Received <15 ping!> @ MailBox2
Received <15 ping!> @ MailBox1
Received <16 ping!> @ MailBox2
Received <16 ping!> @ MailBox1
Received <17 ping!> @ MailBox2
Received <17 ping!> @ MailBox1
Received <18 ping!> @ MailBox2
Received <18 ping!> @ MailBox1
Received <19 ping!> @ MailBox1
Received <19 ping!> @ MailBox2
Received <20 ping!> @ MailBox1
Received <20 ping!> @ MailBox2
Received <21 ping!> @ MailBox1
Received <21 ping!> @ MailBox2
Received <22 ping!> @ MailBox1
Received <22 ping!> @ MailBox2
Received <23 ping!> @ MailBox1
Received <23 ping!> @ MailBox2
Received <24 ping!> @ MailBox1
Received <24 ping!> @ MailBox2
Received <25 ping!> @ MailBox1
Received <25 ping!> @ MailBox2
Received <26 ping!> @ MailBox1
Received <26 ping!> @ MailBox2
Received <27 ping!> @ MailBox1
Received <27 ping!> @ MailBox2
Received <28 ping!> @ MailBox2
Received <28 ping!> @ MailBox1
Received <29 ping!> @ MailBox2
Received <29 ping!> @ MailBox1
Received <30 ping!> @ MailBox2
Received <30 ping!> @ MailBox1
Received <31 ping!> @ MailBox2
Received <31 ping!> @ MailBox1
Received <32 ping!> @ MailBox2
Received <33 ping!> @ MailBox2
Received <32 ping!> @ MailBox1
Received <34 ping!> @ MailBox2
Received <33 ping!> @ MailBox1
Received <34 ping!> @ MailBox1
Received <35 ping!> @ MailBox2
Received <35 ping!> @ MailBox1
Received <36 ping!> @ MailBox2
Received <36 ping!> @ MailBox1
Received <37 ping!> @ MailBox2
Received <37 ping!> @ MailBox1
Received <38 ping!> @ MailBox2
Received <38 ping!> @ MailBox1
Received <39 ping!> @ MailBox2
Received <39 ping!> @ MailBox1
Received <40 ping!> @ MailBox2
Received <40 ping!> @ MailBox1
Received <41 ping!> @ MailBox2
Received <42 ping!> @ MailBox2
Received <43 ping!> @ MailBox2
Received <44 ping!> @ MailBox2
Received <45 ping!> @ MailBox2
Received <46 ping!> @ MailBox2
Received <47 ping!> @ MailBox2
Received <48 ping!> @ MailBox2
Received <49 ping!> @ MailBox2
Received <50 ping!> @ MailBox2
Received <51 ping!> @ MailBox2
Received <52 ping!> @ MailBox2
Received <53 ping!> @ MailBox2
Received <54 ping!> @ MailBox2
Received <55 ping!> @ MailBox2
Received <56 ping!> @ MailBox2
Received <57 ping!> @ MailBox2
Received <58 ping!> @ MailBox2
Received <59 ping!> @ MailBox2
Received <60 ping!> @ MailBox2
Received <61 ping!> @ MailBox2
Received <62 ping!> @ MailBox2
Received <63 ping!> @ MailBox2
Received <64 ping!> @ MailBox2
Received <65 ping!> @ MailBox2
Received <66 ping!> @ MailBox2
Received <67 ping!> @ MailBox2
Received <68 ping!> @ MailBox2
Received <69 ping!> @ MailBox2
Received <70 ping!> @ MailBox2
Received <71 ping!> @ MailBox2
Received <72 ping!> @ MailBox2
Received <73 ping!> @ MailBox2
Received <74 ping!> @ MailBox2
Received <75 ping!> @ MailBox2
Received <76 ping!> @ MailBox2
Received <77 ping!> @ MailBox2
Received <78 ping!> @ MailBox2
Received <79 ping!> @ MailBox2
Received <80 ping!> @ MailBox2
Received <81 ping!> @ MailBox2
Received <82 ping!> @ MailBox2
Received <83 ping!> @ MailBox2
Received <84 ping!> @ MailBox2
Received <85 ping!> @ MailBox2
Received <86 ping!> @ MailBox2
Received <87 ping!> @ MailBox2
Received <88 ping!> @ MailBox2
Received <89 ping!> @ MailBox2
Received <90 ping!> @ MailBox2
Received <91 ping!> @ MailBox2
Received <92 ping!> @ MailBox2
Received <93 ping!> @ MailBox2
Received <94 ping!> @ MailBox2
Received <95 ping!> @ MailBox2
Received <96 ping!> @ MailBox2
Received <97 ping!> @ MailBox2
Received <98 ping!> @ MailBox2
Received <99 ping!> @ MailBox2
Received <100 ping!> @ MailBox2
Received <41 ping!> @ MailBox1
Received <42 ping!> @ MailBox1
Received <43 ping!> @ MailBox1
Received <44 ping!> @ MailBox1
Received <45 ping!> @ MailBox1
Received <46 ping!> @ MailBox1
Received <47 ping!> @ MailBox1
Received <48 ping!> @ MailBox1
Received <49 ping!> @ MailBox1
Received <50 ping!> @ MailBox1
Received <51 ping!> @ MailBox1
Received <52 ping!> @ MailBox1
Received <53 ping!> @ MailBox1
Received <54 ping!> @ MailBox1
Received <55 ping!> @ MailBox1
Received <56 ping!> @ MailBox1
Received <57 ping!> @ MailBox1
Received <58 ping!> @ MailBox1
Received <59 ping!> @ MailBox1
Received <60 ping!> @ MailBox1
Received <61 ping!> @ MailBox1
Received <62 ping!> @ MailBox1
Received <63 ping!> @ MailBox1
Received <64 ping!> @ MailBox1
Received <65 ping!> @ MailBox1
Received <66 ping!> @ MailBox1
Received <67 ping!> @ MailBox1
Received <68 ping!> @ MailBox1
Received <69 ping!> @ MailBox1
Received <70 ping!> @ MailBox1
Received <71 ping!> @ MailBox1
Received <72 ping!> @ MailBox1
Received <73 ping!> @ MailBox1
Received <74 ping!> @ MailBox1
Received <75 ping!> @ MailBox1
Received <76 ping!> @ MailBox1
Received <77 ping!> @ MailBox1
Received <78 ping!> @ MailBox1
Received <79 ping!> @ MailBox1
Received <80 ping!> @ MailBox1
Received <81 ping!> @ MailBox1
Received <82 ping!> @ MailBox1
Received <83 ping!> @ MailBox1
Received <84 ping!> @ MailBox1
Received <85 ping!> @ MailBox1
Received <86 ping!> @ MailBox1
Received <87 ping!> @ MailBox1
Received <88 ping!> @ MailBox1
Received <89 ping!> @ MailBox1
Received <90 ping!> @ MailBox1
Received <91 ping!> @ MailBox1
Received <92 ping!> @ MailBox1
Received <93 ping!> @ MailBox1
Received <94 ping!> @ MailBox1
Received <95 ping!> @ MailBox1
Received <96 ping!> @ MailBox1
Received <97 ping!> @ MailBox1
Received <98 ping!> @ MailBox1
Received <99 ping!> @ MailBox1
Received <100 ping!> @ MailBox1

Correct me if I am wrong but with this configuration I don't see any issues. Two Queues and One Task Executor.

like image 71
shazin Avatar answered Oct 20 '22 00:10

shazin