I have the following PHP application. That publishes a user signUp to a message queue. The Java Application reads from that queue and imports it. Hopefully the diagram below will discribe it. I am only working with the Java side of things. The json messages exists on the queue already.
Route (Java Consuming Side).
@Component
public class SignUpRouting {
errorHandler(deadLetterChannel("rabbitmq://signUpDeadLetter.exchange?username=etc..").useOriginalMessage());
from("rabbitmq://phpSignUp.exchange?username=etc....")
.routeId("signUpRoute")
.processRef("signUpProcessor")
.end();
//....
The processor..
@Component
public class SignupProcessor implements Processor {
private ObjectMapper mapper = new ObjectMapper();
@Override
public void process(Exchange exchange) throws Exception {
String json = exchange.getIn().getBody(String.class);
SignUpDto dto = mapper.readValue(json, SignUpDto.class);
SignUp signUp = new SignUp();
signUp.setWhatever(dto.getWhatever());
//etc....
// save record
signUpDao.save(signUp);
}
}
My question is this.. What should I do I do when the Processor fails to import the message.
Lets say for example there was a DAO exception. A data field may have been toolong or the import was in the incorrect format. I dont want to lose the message. I would like to see the error and retry the import. But I would not want to keep retrying the message every 30 seconds.
I am thinking that I would need to create another queue.. A dead letter queue and have that indefinately retry the message every 6 hours?.. I would then view the logs see the error and upload a fix and the message would be reprocessed?
How would I implement that? Or am I on the wrong track?
EDIT I have tried setting deadLetterExchange to see if would get things on the right direction... However it errors and says queue cannot be non null
rabbitmq://phpSignUp.exchange?username=etc...&deadLetterExchange=signUpDeadLetter.exchange
Here is a example to use dead letter headers:
<from uri="rabbitmq://localhost/youexchange?queue=yourq1&
exchangeType=topic&
routingKey=user.reg.*&
deadLetterExchange=dead.msgs&
deadLetterExchangeType=topic&
deadLetterRoutingKey=dead.letters&
deadLetterQueue=dead.letters&
autoAck=false&
autoDelete=false"/>
<!--We can use onException to make camel to retry, and after that, dead letter queue are the fallback-->
<onException useOriginalMessage="true">
<exception>java.lang.Exception</exception>
<redeliveryPolicy asyncDelayedRedelivery="true" maximumRedeliveries="3" redeliveryDelay="5000"/>
</onException>
We need to turn off autoAck and set deadLetterQueue, then if there is an Exception thrown, the message will be in dead letters queue. To use onException, we can control the retrying before camel dropping the message to dead letter queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With