Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Concurrent Processing of JMS Message Listener Weblogic

I'm running a test case on JMS and found processing is sequential. When I fired 200 requests to a servlet which sends messages using JMS and receiver(messageListner) is reciving requests sequentially. How to receive concurrent requests? Do we have any parameters to set? I read JMS tutorials and API's that in a same session messages are delivered sequntially, even I'm creating a new session for each send request & 10 sessions at receiving end still processing is sequential.

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

TestJMSListener jms = new TestJMSListener();
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}

Listner implementation where after receiving a message I'm going to sleep(doing something for a second).

public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public TestJMSListener() {

    System.out.println("********* Consumer check **********");

    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection[] = null;
    Session session[] = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer[] = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    connection = new Connection[10];
    session = new Session[10];
    consumer = new MessageConsumer[10];
    for (int i = 0; i < 10; i++) {
        try {

            connection[i] = connectionFactory.createConnection();
            session[i] = connection[i].createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer[i] = session[i].createConsumer(dest);
            consumer[i].setMessageListener(this);
            connection[i].start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }
    }
}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

 }

I'm using Weblogic 11g, with default configurations for ConnectionFactory & Queue. When I used Topic it actually delivering only one message per second(i.e. after completion of first message) and for Queue it is delivering 2 to 3 messages per second. How to make my listener to support concurrent processing.

Final Solution

Added more listener objects insted multiple sessions/consumers in listners it solved the purpose. Find the Modified Code below.

public class ProducerServlet extends javax.servlet.http.HttpServlet implements
    javax.servlet.Servlet {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";
TestJMSListener listeners[] = new TestJMSListener[20];
ConnectionFactory connectionFactory = null;
Queue dest1 = null;
Topic dest =null;
Connection connection = null;
MessageProducer producer = null;

protected void doGet(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException {
        try {
            connection = connectionFactory.createConnection();              

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(dest1);
        TextMessage message = session.createTextMessage();

        message.setText("This is message from JMSSECOND DEMO "
                + request.getParameter("Num"));
        System.out.println("Sending message: " + message.getText());
        producer.send(message);
        producer.send(session.createMessage());
    } catch (Exception e) {
        System.out.println("Exception occurred: " + e.toString());
    }

}

@Override
public void init(ServletConfig arg0) throws ServletException {      
    Context jndiContext = null;
    try {

        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());            
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
        for(int i=0;i<listeners.length;i++ ){
        listeners[i]=new TestJMSListener(Integer.toString(i+1));    
        }

    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        e.printStackTrace();            
    }

}

}


public class TestJMSListener implements MessageListener {

// Defines the JNDI context factory.
public final static String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";

// Defines the JMS context factory.
public final static String JMS_FACTORY = "jms/TestConnectionFactory";

// Defines the queue.
public final static String QUEUE = "jms/TestJMSQueue";

public final static String TOPIC = "jms/TestTopic";

public String listnerNum = "";
public TestJMSListener(String listerNo) {
    super();
    System.out.println("********* Consumer check **********");
    listnerNum = listerNo;
    Context jndiContext = null;
    ConnectionFactory connectionFactory = null;
    Connection connection = null;
    Session session = null;
    Queue dest1 = null;
    Topic dest = null;
    MessageConsumer consumer = null;
    // TextMessage message = null;

    try {
        Hashtable env = new Hashtable();
        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
        env.put(Context.PROVIDER_URL, "http://localhost:7001");
        jndiContext = new InitialContext(env);
    } catch (NamingException e) {
        System.out.println("Could not create JNDI API context: "
                + e.toString());
        System.exit(1);
    }

    try {
        connectionFactory = (ConnectionFactory) jndiContext
                .lookup(JMS_FACTORY);
        dest1 = (Queue) jndiContext.lookup(QUEUE);
    } catch (Exception e) {
        System.out.println("JNDI API lookup failed: " + e.toString());
        System.exit(1);
    }
    try{
            connection = connectionFactory.createConnection();
            session = connection.createSession(false,
                    Session.AUTO_ACKNOWLEDGE);
            consumer = session.createConsumer(dest1);
            consumer.setMessageListener(this);
            connection.start();
        } catch (JMSException e) {
            System.out.println("Exception occurred: " + e.toString());
        }


}

@Override
public void onMessage(Message m) {

    if (m instanceof TextMessage) {
        TextMessage message = (TextMessage) m;
        try {
            System.out.println("Reading message from Listener: "+listnerNum+ " : "
                    + new Date() + message.getText());
            Thread.sleep(1000);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

}
like image 403
learner Avatar asked Nov 30 '13 10:11

learner


People also ask

What is concurrency in JMS listener?

The Concurrency property specifies whether the message consumers use connection consumer or serialized processing. To use concurrent processing for a connection, select the Connection consumer setting. To use serial execution, select the Serial mode setting.

How JMS queue works in WebLogic?

A JMS server defines a set of destinations (queues or topics) and any associated persistent storage that reside on a WebLogic Server instance. A JMS server manages connections and handles all message requests for its destinations on behalf of clients.

How does JMS listener work?

The JMS Listener adapter operates in an asynchronous mode. It establishes an asynchronous listener on the queue or topic destination specified by the JNDI name of Destination field. When a qualified message arrives at the destination, the adapter immediately processes the message.


2 Answers

In your code, You have only one Listener instance (Created while Servlet instance created) , thus you will be receiving the messages sequencial only ,
Irrespective of how many sender session you have .. it is Just Queue.
If you want receive concurrently , then you might need multiple Listeners and only one time message will be delivered in any one of the listeners.
If you want to process the Messages concurrently, once its delivered sequencly then create thread pool and deligate the process in seperate thread and return back to listening mode .
Note** in this Mode you may not be handle Ack mode properly since you are ack without completing the Message process.

like image 167
Mani Avatar answered Nov 09 '22 05:11

Mani


I looked your solution and realized that you have several concurrent sessions, consumers etc, but one queue to handle all this. Queue is, well a queue, there all go in line through a pipe sequentially. If you happen to have only one of them - then you have one execution thread on that point and everything goes sequential, because queue does not let concurrent things happen.

If you implement several queues in different threads, your machine is able to handle several calls concurrently. Several queues may imply usage of different queue names etc, but for that hassle you may use a load balancer solution like Apache Camel to actually make the queue selection for you. At least this closed post gives me an understanding that this kind of combination of queues and threads is possible.

Then, the balancer selects individual queue for every request and every queue makes it own sequential work to handle the request. Amount of concurrent sessions is then a matter of configuration.

like image 42
mico Avatar answered Nov 09 '22 07:11

mico