Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to simulate message redelivery in AUTO_ACKNOWLEDGE JMS Session Scenario?

In the following test I'm trying to simulate the following scenario:

  1. A message queue is started.
  2. A consumer designed to fail during message processing is started.
  3. A message is produced.
  4. The consumer starts processing the message.
  5. During processing an exception is thrown to simulate message processing failure. The failing consumer is stopped.
  6. Another consumer is started with the intent to pick up the redelivered message.

But my test fails and the message is not redelivered to the new consumer. I'll appreciate any hints on this.

MessageProcessingFailureAndReprocessingTest.java

@ContextConfiguration(locations="com.prototypo.queue.MessageProcessingFailureAndReprocessingTest$ContextConfig",
        loader=JavaConfigContextLoader.class)
public class MessageProcessingFailureAndReprocessingTest  extends AbstractJUnit4SpringContextTests {
    @Autowired
    private FailureReprocessTestScenario testScenario;

    @Before
    public void setUp() {
        testScenario.start();
    }

    @After
    public void tearDown() throws Exception {
        testScenario.stop();
    }

    @Test public void 
    should_reprocess_task_after_processing_failure() {
        try {
            Thread.sleep(20*1000);

            assertThat(testScenario.succeedingWorker.processedTasks, is(Arrays.asList(new String[]{
                    "task-1",
            })));
        } catch (InterruptedException e) {
            fail();
        }
    }

    @Configurable
    public static class FailureReprocessTestScenario {
        @Autowired
        public BrokerService broker;

        @Autowired
        public MockTaskProducer mockTaskProducer;

        @Autowired
        public FailingWorker failingWorker;

        @Autowired
        public SucceedingWorker succeedingWorker;

        @Autowired
        public TaskScheduler scheduler;

        public void start() {
            Date now = new Date();
            scheduler.schedule(new Runnable() {
                public void run() { failingWorker.start(); }
            }, now);

            Date after1Seconds = new Date(now.getTime() + 1*1000);
            scheduler.schedule(new Runnable() {
                public void run() { mockTaskProducer.produceTask(); }
            }, after1Seconds);

            Date after2Seconds = new Date(now.getTime() + 2*1000);
            scheduler.schedule(new Runnable() {
                public void run() {
                    failingWorker.stop();
                    succeedingWorker.start();
                }
            }, after2Seconds);
        }

        public void stop() throws Exception {
            succeedingWorker.stop();
            broker.stop();
        }
    }

    @Configuration
    @ImportResource(value={"classpath:applicationContext-jms.xml",
            "classpath:applicationContext-task.xml"})
    public static class ContextConfig {
        @Autowired
        private ConnectionFactory jmsFactory;

        @Bean
        public FailureReprocessTestScenario testScenario() {
            return new FailureReprocessTestScenario();
        }

        @Bean
        public MockTaskProducer mockTaskProducer() {
            return new MockTaskProducer();
        }

        @Bean
        public FailingWorker failingWorker() {
            TaskListener listener = new TaskListener();
            FailingWorker worker = new FailingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        @Bean
        public SucceedingWorker succeedingWorker() {
            TaskListener listener = new TaskListener();
            SucceedingWorker worker = new SucceedingWorker(listenerContainer(listener));
            listener.setProcessor(worker);
            return worker;
        }

        private DefaultMessageListenerContainer listenerContainer(TaskListener listener) {
            DefaultMessageListenerContainer listenerContainer = new DefaultMessageListenerContainer();
            listenerContainer.setConnectionFactory(jmsFactory);
            listenerContainer.setDestinationName("tasksQueue");
            listenerContainer.setMessageListener(listener);
            listenerContainer.setAutoStartup(false);
            listenerContainer.initialize();
            return listenerContainer;
        }

    }

    public static class FailingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(FailingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public FailingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
        }

        public void start() {
            LOG.info("FailingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("FailingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("FailingWorker.processTask(" + task + ")");
            try {
                Thread.sleep(1*1000);
                throw Throwables.propagate(new Exception("Simulate task processing failure"));
            } catch (InterruptedException e) {
                LOG.log(Level.SEVERE, "Unexpected interruption exception");
            }
        }
    }

    public static class SucceedingWorker implements TaskProcessor {
        private Logger LOG = Logger.getLogger(SucceedingWorker.class.getName());

        private final DefaultMessageListenerContainer listenerContainer;

        public final List<String> processedTasks;

        public SucceedingWorker(DefaultMessageListenerContainer listenerContainer) {
            this.listenerContainer = listenerContainer;
            this.processedTasks = new ArrayList<String>();
        }

        public void start() {
            LOG.info("SucceedingWorker.start()");
            listenerContainer.start();
        }

        public void stop() {
            LOG.info("SucceedingWorker.stop()");
            listenerContainer.stop();
        }

        @Override
        public void processTask(Object task) {
            LOG.info("SucceedingWorker.processTask(" + task + ")");
            try {
                TextMessage taskText = (TextMessage) task;
                processedTasks.add(taskText.getText());
            } catch (JMSException e) {
                LOG.log(Level.SEVERE, "Unexpected exception during task processing");
            }
        }
    }

}

TaskListener.java

public class TaskListener implements MessageListener {

    private TaskProcessor processor;

    @Override
    public void onMessage(Message message) {
        processor.processTask(message);
    }

    public void setProcessor(TaskProcessor processor) {
        this.processor = processor;
    }

}

MockTaskProducer.java

@Configurable
public class MockTaskProducer implements ApplicationContextAware {
    private Logger LOG = Logger.getLogger(MockTaskProducer.class.getName());

    @Autowired
    private JmsTemplate jmsTemplate;

    private Destination destination;

    private int taskCounter = 0;

    public void produceTask() {
        LOG.info("MockTaskProducer.produceTask(" + taskCounter + ")");

        taskCounter++;

        jmsTemplate.send(destination, new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage message = session.createTextMessage("task-" + taskCounter);
                return message;
            }
        });
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext)
            throws BeansException {
        destination = applicationContext.getBean("tasksQueue", Destination.class);
    }
}
like image 934
Vladimir Tsvetkov Avatar asked Mar 26 '12 11:03

Vladimir Tsvetkov


1 Answers

Apparently the source of documentation I was looking yesterday Creating Robust JMS Applications mislead me in a way (or I might have understood it incorrectly). Especially that excerpt:

Until a JMS message has been acknowledged, it is not considered to be successfully consumed. The successful consumption of a message ordinarily takes place in three stages.

  1. The client receives the message.
  2. The client processes the message.
  3. The message is acknowledged. Acknowledgment is initiated either by the JMS provider or by the client, depending on the session acknowledgment mode.

I assumed AUTO_ACKNOWLEDGE does exactly that - acknowledged the message after the listener method returns a result. But according to the JMS specification it is a bit different and Spring listener containers as expected do not try to alter the behavior from the JMS specification. This is what the javadoc of AbstractMessageListenerContainer has to say - I've emphasized the important sentences:

The listener container offers the following message acknowledgment options:

  • "sessionAcknowledgeMode" set to "AUTO_ACKNOWLEDGE" (default): Automatic message acknowledgment before listener execution; no redelivery in case of exception thrown.
  • "sessionAcknowledgeMode" set to "CLIENT_ACKNOWLEDGE": Automatic message acknowledgment after successful listener execution; no redelivery in case of exception thrown.
  • "sessionAcknowledgeMode" set to "DUPS_OK_ACKNOWLEDGE": Lazy message acknowledgment during or after listener execution; potential redelivery in case of exception thrown.
  • "sessionTransacted" set to "true": Transactional acknowledgment after successful listener execution; guaranteed redelivery in case of exception thrown.

So the key to my solution is listenerContainer.setSessionTransacted(true);

Another issue I faced was that the JMS provider keeps redelivering the failed message back to the same consumer that had failed during the processing of the message. I don't know if the JMS specification gives a prescription what the provider should do in such situations, but what have worked for me was to use listenerContainer.shutdown(); in order to disconnect the failing consumer and allow the provider to redeliver the message and give a chance to another consumer.

like image 183
Vladimir Tsvetkov Avatar answered Nov 15 '22 10:11

Vladimir Tsvetkov