Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RabbitMQ - Apache Camel Reading Messages what to do with failed messages

I have the following PHP application. That publishes a user signUp to a message queue. The Java Application reads from that queue and imports it. Hopefully the diagram below will discribe it. I am only working with the Java side of things. The json messages exists on the queue already.

enter image description here

Route (Java Consuming Side).

@Component
public class SignUpRouting {

  errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());

  from("rabbitmq://phpSignUp.exchange?username=etc....")
            .routeId("signUpRoute")
            .processRef("signUpProcessor")
            .end();
  //.... 

The processor..

@Component
public class SignupProcessor implements Processor {

    private ObjectMapper mapper = new ObjectMapper();

    @Override
    public void process(Exchange exchange) throws Exception {

        String json = exchange.getIn().getBody(String.class);
        SignUpDto dto = mapper.readValue(json, SignUpDto.class);

        SignUp signUp = new SignUp();
        signUp.setWhatever(dto.getWhatever());
        //etc....

        // save record
        signUpDao.save(signUp);
    }
}

My question is this.. What should I do I do when the Processor fails to import the message.

Lets say for example there was a DAO exception. A data field may have been toolong or the import was in the incorrect format. I dont want to lose the message. I would like to see the error and retry the import. But I would not want to keep retrying the message every 30 seconds.

I am thinking that I would need to create another queue.. A dead letter queue and have that indefinately retry the message every 6 hours?.. I would then view the logs see the error and upload a fix and the message would be reprocessed?

How would I implement that? Or am I on the wrong track?

EDIT I have tried setting deadLetterExchange to see if would get things on the right direction... However it errors and says queue cannot be non null

 rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
like image 645
Robbo_UK Avatar asked Oct 17 '15 19:10

Robbo_UK


1 Answers

Here is a example to use dead letter headers:

        <from uri="rabbitmq://localhost/youexchange?queue=yourq1&amp;
            exchangeType=topic&amp;
            routingKey=user.reg.*&amp;
            deadLetterExchange=dead.msgs&amp;
            deadLetterExchangeType=topic&amp;
            deadLetterRoutingKey=dead.letters&amp;
            deadLetterQueue=dead.letters&amp;
            autoAck=false&amp;
            autoDelete=false"/>

          <!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
        <onException useOriginalMessage="true">
            <exception>java.lang.Exception</exception>
            <redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
        </onException>

We need to turn off autoAck and set deadLetterQueue, then if there is an Exception thrown, the message will be in dead letters queue. To use onException, we can control the retrying before camel dropping the message to dead letter queue.

like image 138
sanigo Avatar answered Oct 23 '22 02:10

sanigo