There are two programs: subscriber and publisher... Subscriber is able to put the message onto the topic and the message is sent successfully. When I check the activemq server on my browser it shows 1 msg enqueued . But when I run the consumer code, it is not receiving the message
Here is the producer code:
import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class producer { private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); // JMS messages are sent and received using a Session. We will // create here a non-transactional session object. If you want // to use transactions you should set the first parameter to 'true' Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testt"); MessageProducer producer = session.createProducer(topic); // We will send a small text message saying 'Hello' TextMessage message = session.createTextMessage(); message.setText("HELLO JMS WORLD"); // Here we are sending the message! producer.send(message); System.out.println("Sent message '" + message.getText() + "'"); connection.close(); } }
After I run this code the output at the console is:
26 Jan, 2012 2:30:04 PM org.apache.activemq.transport.failover.FailoverTransport doReconnect INFO: Successfully connected to tcp://localhost:61616 Sent message 'HELLO JMS WORLD'
And here is the consumer code:
import javax.jms.*; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class consumer { // URL of the JMS server private static String url = ActiveMQConnection.DEFAULT_BROKER_URL; // Name of the topic from which we will receive messages from = " testt" public static void main(String[] args) throws JMSException { // Getting JMS connection from the server ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); Connection connection = connectionFactory.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("testt"); MessageConsumer consumer = session.createConsumer(topic); MessageListener listner = new MessageListener() { public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage textMessage = (TextMessage) message; System.out.println("Received message" + textMessage.getText() + "'"); } } catch (JMSException e) { System.out.println("Caught:" + e); e.printStackTrace(); } } }; consumer.setMessageListener(listner); connection.close(); } }
After I run this code it doesnt show anything. Can someone help to me to overcome this problem?
Subscribers can subscribe to a topic on ActiveMQ as durable subscribers by using the createDurableSubscriber() method available in javax. jms. Session class. To create a non-durable subscriber, instead of using the session.
Both ActiveMQ versions are capable of point-to-point messaging—in which the broker routes each message to one of the available consumers in a round-robin pattern—and publish/subscribe (or “pub/sub”) messaging—in which the broker delivers each message to every consumer that is subscribed to the topic (in ActiveMQ ...
Kafka is way faster than ActiveMQ. It can handle millions of messages per sec. ActiveMQ supports both message queues and publishes/subscribe messaging systems. On the other hand, Kafka is based on publish/subscribe but does have certain advantages of message-queues.
Your issue is that your consumer is running and then shutting down immediately.
Try adding this into your consumer:
consumer.setMessageListener(listner); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } connection.close();
This will wait until you hit a key before stopping.
Other things to consider:
The main problem (besides the app closing down to quickly) is that you are sending to a Topic. Topics don't retain messages so if you run your application that produces and then run the consumer, the consumer won't receive anything because it was not subscribed to the topic at the time the message was sent. If you fix the shutdown issue and then run the consumer in one terminal and then run the producer you should then see the message received by your consumer. If you want message retention then you need to use a Queue which will hold onto the message until someone consumes it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With