I have an Oracle AQ with the queue type of SYS.AQ$_JMS_TEXT_MESSAGE. What I'm trying to do is to insert a text into the mentioned queue from a java application.
The equivalent SQL query is
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload SYS.AQ$_JMS_TEXT_MESSAGE;
begin
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
sys.dbms_aq.enqueue (
queue_name => 'QUEUE_NAME',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
/
I got most of it right using this guide, but I'm stuck at
o_payload := sys.aq$_jms_text_message.construct;
o_payload.set_text(xmltype('<user>text</user>').getClobVal());
The guide shows how to enqueue a RAW message, but I need it to be JMS, otherwise the data type doesn't match the queue type.
Any help would be appreciated, because even with the almighty google I am not able to find a solution to this problem. Is there a way to do it using the oracle.jdbc.aq
classes, or do I just have to suck it up and use the SQL query?
createConsumer(destination); The parameter destination is a Queue or Topic object that the application has created previously. The application then uses the receive() method of the MessageConsumer object to receive a message from the destination, as shown in the following example: Message inMessage = consumer.
JMS supports both messaging models: point-to-point (queuing) and publish-subscribe. JMS was defined to allow Java application to use enterprise messaging systems. More importantly, it provides a common way for Java applications to access such enterprise messaging systems.
Just copy paste this code and try. (if it works for you) Then carefully go through the code, and understand.
While executing,
after that,
Then comment/uncomment each line respectively and give a try.
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.TextMessage;
import oracle.AQ.AQQueueTable;
import oracle.AQ.AQQueueTableProperty;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsDestinationProperty;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
public class OracleAQClient {
public static QueueConnection getConnection() {
String hostname = "localhost";
String oracle_sid = "xe";
int portno = 1521;
String userName = "jmsuser";
String password = "jmsuser";
String driver = "thin";
QueueConnectionFactory QFac = null;
QueueConnection QCon = null;
try {
// get connection factory , not going through JNDI here
QFac = AQjmsFactory.getQueueConnectionFactory(hostname, oracle_sid, portno, driver);
// create connection
QCon = QFac.createQueueConnection(userName, password);
} catch (Exception e) {
e.printStackTrace();
}
return QCon;
}
public static void createQueue(String user, String qTable, String queueName) {
try {
/* Create Queue Tables */
System.out.println("Creating Queue Table...");
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
AQQueueTableProperty qt_prop;
AQQueueTable q_table = null;
AQjmsDestinationProperty dest_prop;
Queue queue = null;
qt_prop = new AQQueueTableProperty("SYS.AQ$_JMS_TEXT_MESSAGE");
q_table = ((AQjmsSession) session).createQueueTable(user, qTable, qt_prop);
System.out.println("Qtable created");
dest_prop = new AQjmsDestinationProperty();
/* create a queue */
queue = ((AQjmsSession) session).createQueue(q_table, queueName, dest_prop);
System.out.println("Queue created");
/* start the queue */
((AQjmsDestination) queue).start(session, true, true);
} catch (Exception e) {
e.printStackTrace();
return;
}
}
public static void sendMessage(String user, String queueName,String message) {
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
Queue queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageProducer producer = session.createProducer(queue);
TextMessage tMsg = session.createTextMessage(message);
//set properties to msg since axis2 needs this parameters to find the operation
tMsg.setStringProperty("SOAPAction", "getQuote");
producer.send(tMsg);
System.out.println("Sent message = " + tMsg.getText());
session.close();
producer.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
return;
}
}
public static void browseMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enu = browser.getEnumeration();
List list = new ArrayList();
while (enu.hasMoreElements()) {
TextMessage message = (TextMessage) enu.nextElement();
list.add(message.getText());
}
for (int i = 0; i < list.size(); i++) {
System.out.println("Browsed msg " + list.get(i));
}
browser.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void consumeMessage(String user, String queueName) {
Queue queue;
try {
QueueConnection QCon = getConnection();
Session session = QCon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
QCon.start();
queue = ((AQjmsSession) session).getQueue(user, queueName);
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = (TextMessage) consumer.receive();
System.out.println("MESSAGE RECEIVED " + msg.getText());
consumer.close();
session.close();
QCon.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
String userName = "jmsuser";
String queue = "sample_aq";
String qTable = "sample_aqtbl";
//createQueue(userName, qTable, queue);
//sendMessage(userName, queue,"<user>text</user>");
//browseMessage(userName, queue);
//consumeMessage(userName, queue);
}
}
You will need to copy these jars/libs to your java project from your oracle DB setup directory
The credits should go to Ratha for this article[1]. There were few stuff to be amended, I just modified those and provided the code.
[1] http://wso2.com/library/tutorials/2011/11/configuring-wso2-esb-with-oracle-as-messaging-media/
Thanks
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With