I'm researching queuing solutions for one of my team's apps. Ideally we would like something that can be configured both as a lightweight, in-process broker (for low-throughput messaging between threads) and as an external broker. Is there an MQ server out there that can do this? Most seem to require setup as an external entity. ZeroMQ appears to come the closest to an in-process solution, but it seems to be more of a "UDP socket on steroids", and we need reliable delivery.
From IBM MQ 8.0, the IBM MQ classes for Java are built with Java 7. The Java 7 runtime environment supports running earlier class file versions.
IBM® MQ provides two alternative application programming interfaces (APIs) for use in Java applications: IBM MQ classes for Java Message Service and IBM MQ classes for Java. IBM supports, and is an active participant of, open standards and within the messaging area the API standard is the Java Message Service (JMS).
MQ can act as a native queue mechanism or a transport for JMS messages. The difference being that JMS messages have some standard header fields at the begining of the message buffer and "native" mq messages contain just the data your program sent to the buffer.
Like we said ActiveMQ
is a bit heavier than ZeroMQ
but it work really well as an embedded process. Here a simple example with Spring
and ActiveMQ
.
The message listener that will be used to test the queue :
public class TestMessageListener implements MessageListener { private static final Logger logger = LoggerFactory.getLogger(TestMessageListener.class); @Override public void onMessage(Message message) { /* Receive the text message */ if (message instanceof TextMessage) { try { String text = ((TextMessage) message).getText(); System.out.println("Message reception from the JMS queue : " + text); } catch (JMSException e) { logger.error("Error : " + e.getMessage()); } } else { /* Handle non text message */ } } }
ActiveMQ
context configuration :
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <bean id="jmsQueueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61617</value> </property> </bean> <bean id="pooledJmsQueueConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <constructor-arg ref="jmsQueueConnectionFactory" /> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="messageQueue" /> </bean> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <constructor-arg ref="pooledJmsQueueConnectionFactory" /> <property name="pubSubDomain" value="false"/> </bean> <bean id="testMessageListener" class="com.example.jms.TestMessageListener" /> <bean id="messageQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="pooledJmsQueueConnectionFactory" /> <property name="destination" ref="QueueDestination" /> <property name="messageListener" ref="testMessageListener" /> <property name="concurrentConsumers" value="5" /> <property name="acceptMessagesWhileStopping" value="false" /> <property name="recoveryInterval" value="10000" /> <property name="cacheLevelName" value="CACHE_CONSUMER" /> </bean> </beans>
The JUnit
test :
@ContextConfiguration(locations = {"classpath:/activeMQ-context.xml"}) public class SpringActiveMQTest extends AbstractJUnit4SpringContextTests { @Autowired private JmsTemplate template; @Autowired private ActiveMQDestination destination; @Test public void testJMSFactory() { /* sending a message */ template.convertAndSend(destination, "Hi"); /* receiving a message */ Object msg = template.receive(destination); if (msg instanceof TextMessage) { try { System.out.println(((TextMessage) msg).getText()); } catch (JMSException e) { System.out.println("Error : " + e.getMessage()); } } } }
The Dependencies to add to the pom.xml
:
<!-- Spring --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${org.springframework-version}</version> </dependency> <!-- ActiveMQ --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.6.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.6.0</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-core</artifactId> <version>5.6.0</version> </dependency>
The WebSphere MQ client has the capability to perform multicast pub/sub. This provides a client-to-client capability that bypasses the queue manager, although a queue manager is required to establish the connection.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With