Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

ActiveMQ consumer level timeout

I trying to create a consumer level timeout in Active MQ (version 5.15.0). Consider one message is picked by a consumer but not able to acknowledge so in that case i want consumer to timeout so that same message can be picked by other consumer listening to broker.

My Producer code where I am setting two consumer listener:

public class JmsMessageListenerAckExample {
  public static void main(String[] args) throws URISyntaxException, Exception {
    Connection connection = null;
    try {
      // Producer
      ConnectionFactory factory = createActiveMQConnectionFactory();
      connection = factory.createConnection();
      Session session = connection.createSession(false,
          Session.CLIENT_ACKNOWLEDGE);
      Queue queue = session.createQueue("customerQueue");
      String payload = "Important Task";
      Message msg = session.createTextMessage(payload);
      MessageProducer producer = session.createProducer(queue);

      System.out.println("Sending text '" + payload + "'");
      producer.send(msg);

      // Consumer
      MessageConsumer consumer1 = session.createConsumer(queue);
      consumer1.setMessageListener(
          new AckMessageListener(false, "consumer1"));
      Thread.sleep(1000);
      System.out.println("Creating new message listener to acknowledge");
      producer.send(msg);
      MessageConsumer consumer2 = session.createConsumer(queue);
      consumer2.setMessageListener(
          new AckMessageListener(true, "consumer2"));
      connection.start();

      Thread.sleep(3000);
      session.close();
    } finally {
      if (connection != null) {
        connection.close();
      }
    }
  }

  private static ActiveMQConnectionFactory createActiveMQConnectionFactory() {
    // Create a connection factory.
    final ActiveMQConnectionFactory connectionFactory =
        new ActiveMQConnectionFactory("tcp://localhost:61616");

    // Pass the username and password.
    connectionFactory.setUserName("user");
    connectionFactory.setPassword("user");
    return connectionFactory;
  }
}

This is my consumer listener:

public class AckMessageListener implements MessageListener {
  private boolean acknowledge;
  private String consumerName;

  public AckMessageListener(boolean acknowledge, String consumerName) {
    this.acknowledge = acknowledge;
    this.consumerName = consumerName;
  }

  public void onMessage(Message message) {
    boolean terminate = !acknowledge;
    try {

      System.out.println("ConsumerName="+consumerName+", Acknowledge="+acknowledge);
      if (acknowledge) {
        try {
          message.acknowledge();
        } catch (JMSException e1) {
          e1.printStackTrace();
        }
      }

      System.out.println(message);
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      if (terminate) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

I want to simulate in a way that consumer1 listen to message but does not acknowledge so that it timeout i am trying to release the thread, I am expecting my consumer2 to pick it up and give acknowledgement to the message so that message get moved from "Messages Enqueued" state to "Messages Dequeued" state in broker but my consumer2 is not able to receive any message events.

Is there something i am doing wrong. How do I achieve consumer level timeout with Active MQ?

like image 548
subro Avatar asked Nov 07 '22 01:11

subro


1 Answers

One way of handling this would be to use transactions (http://activemq.apache.org/how-do-transactions-work.html). You call commit() on success, rollback() on failure or if session is closed before calling the commit() then re delivery will occur (http://activemq.apache.org/message-redelivery-and-dlq-handling.html).

like image 121
adewan Avatar answered Nov 11 '22 07:11

adewan