Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

HornetQ: How to reuse XAConnection and XASession

I'm facing some issues trying to reuse a XAConnection and XASession on multiple workers in my JBoss application. I've managed simplify the issue down to just one single method. It should be able to both Produce and Consumer a message using the same connection and session. Currently my application has a lot of queues and workers, where each worker is currently initiating and starting each own connection and session, instead of sharing it. Shouldn't that be possible?

Here is my code example:

import org.apache.log4j.Logger;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Startup;
import javax.jms.*;
import javax.jms.Queue;
import javax.naming.InitialContext;

@Singleton
@Startup
public class QueueTest {

    private Logger logger = Logger.getLogger(QueueTest.class);

    @PostConstruct
    public void startup() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(), e);
        }
    }

    @PreDestroy
    public void shutdown() {

    }
}

It results in this output:

11:47:17,905 DEBUG [QueueTest] (MSC service thread 1-8) creating connection at Thu Sep 05 11:47:17 CEST 2013
11:47:18,041 DEBUG [QueueTest] (MSC service thread 1-8) creating session at Thu Sep 05 11:47:18 CEST 2013
11:47:18,065 DEBUG [QueueTest] (MSC service thread 1-8) Message added to queue
11:47:23,081 DEBUG [QueueTest] (MSC service thread 1-8) Error: No message reveived

As you can see, no message is received by the Consumer. Why?

EDIT 1:

package dk.energimidt.uapi.zigbee.services;

import org.apache.log4j.Logger;

import javax.ejb.Stateless;
import javax.ejb.TransactionAttribute;
import javax.ejb.TransactionAttributeType;
import javax.jms.Queue;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.InitialContext;

@TransactionAttribute(TransactionAttributeType.REQUIRED)
@Stateless
public class QueueTestWorkerBean implements QueueTestWorker {

    private Logger logger = Logger.getLogger(QueueTestWorkerBean.class);

    public void run() {
        try {
            String queue = "queue/Queue1";
            String message = "test";

            //setting up connection
            InitialContext iniCtx = new InitialContext();
            XAConnectionFactory qcf = (XAConnectionFactory) iniCtx.lookup("java:/JmsXA");
            XAConnection connection = qcf.createXAConnection();
            connection.start();
            logger.debug("creating connection at " + new java.util.Date());

            //setting up session
            XASession session = connection.createXASession();
            logger.debug("creating session at " + new java.util.Date());

            //find the queue
            Object queueObj = iniCtx.lookup(queue);
            Queue jmsQueue = (javax.jms.Queue)queueObj;

            //adding message to queue
            javax.jms.MessageProducer producer = session.createProducer(jmsQueue);
            javax.jms.TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
            producer.close();
            session.commit();
            logger.debug("Message added to queue");

            //receiving message from queue
            javax.jms.MessageConsumer consumer = session.createConsumer(jmsQueue);
            javax.jms.TextMessage messageReceived = (javax.jms.TextMessage)consumer.receive(5000);

            if (messageReceived==null)
                throw new Exception("No message reveived");

            logger.debug("Got message:"+messageReceived.getText());
            consumer.close();

            connection.close();
        }
        catch(Exception e) {
            logger.debug("Error: " + e.getMessage(), e);
        }
    }
}

Now I get an exception on the Session.Commit():

10:46:03,697 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating connection at Tue Sep 17 10:46:03 CEST 2013
10:46:04,343 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) creating session at Tue Sep 17 10:46:04 CEST 2013
10:46:04,355 DEBUG [QueueTestWorkerBean] (MSC service thread 1-14) Error: XA connection: javax.jms.TransactionInProgressException: XA connection
    at org.hornetq.ra.HornetQRASession.commit(HornetQRASession.java:386)
    at QueueTestWorkerBean.run(QueueTestWorkerBean.java:45) [library-1.0.0.jar:]
like image 905
dhrm Avatar asked Oct 22 '22 01:10

dhrm


1 Answers

I see a few (actually 2) mix ups there:

i - you are using an XA Session but you are not declaring any transaction boundaries... which is usually done on Session Beans and MDBs. I'm not sure you can do that on this Stateless.

if you don't use any declarative Transaction, you would have to enlist the XID manually.

ii - the jmsXA is the default Resource Adapter connection factory. It has a pool on it already. So whenever you create a new session you are taking out of the pool. When you close it you are returning it to the pool.

You could use a regular Connection Factory. Just in the InVMConnectionFactory (or whatever you have defined on your standalone, outside of the PooledConnectionFactories assuming you are on JBoss... and then just use regular JMS.

Even the regular Connection Factory can be used with XA but on that case you will need to make sure you enlist it using the Transaction Manager's api directly.

if you use your regular connection factory, you can then just keep it connected as long as you want.

Please let me know how it goes and I will help you. I know you started a bounty.. but I would have answered it for free :)

I couldn't find any example on the EJB Tutorial about using Transactions with a Singleton.

I would recommend you using it through a Statless or Stateful Session Bean and then applying the @TransactionAttribute to the Bean.

The Java EE 6 Tutorial has some good information about it:

http://docs.oracle.com/javaee/6/tutorial/doc/bncij.html

notice that a message won't be available until you commit. So if you send a message within a transaction you won't be able to receive it on the same transaction.

On your edit1 example you are sending a message and consuming it on the same transaction. That won't work as you first need to commit the producing method before you can consume it. You would need two transactions on this case, so Edit1 is broken.

Also: make sure you close the Connection at the end. Since you are using JmsXA (or a pooled connection factory) you will have the polling done automatically by the Application Server.

like image 183
Clebert Suconic Avatar answered Oct 23 '22 15:10

Clebert Suconic