The question
How one should configure ActiveMQ
and <flow>
in Mule ESB 3.2
, in order to make sure that message pulled from queue ends up properly handled by external CXF service
?
Scenario
I have an CXF endpoint, which should take incoming message and transfer it to three external services as soon as possible. Let's call them EX1, EX2, EX3. This is fairly easy, thanks to the <all>
component introduced in Mule 3.x.
The most important requirement of the whole solution, is to make sure that each received message ends up being delivered to all three CXF services. So we ended up with the idea, to put each incoming message into Persistent JMS queues
(Q1, Q2, Q3). After message is being read from queue Qn, it's transfered directly to corresponding EXn endpoint, and thus - external service.
Config
(I can provide full config upon request)
We have configured ActiveMQ broker as described here and wired it up with our <flow>
config. Everything seems to work as expected, I have JConsole connected to my application so I can see that messages are of type PERSISTENT and they end up in proper queues. If everything goes smoothly - messages are received by all three services EXn.
Tests
The problem arrises when we turn off one of the services, let's say EX2, and restart the whole server simulating failure. The message ends up being lost (I guess it's not that persistent, huh?). The most curious thing is - If we sent 10 messages when the EX2 is down, after server restart 9 of them are being properly redelivered! So I'm thinking that maybe, just maybe, 9 of those 10 messages were properly enqueued, while the one was being constantly redelivered when the server failed down.
This causes me to think, that CXF endpoint is not being handled with transaction support, which I cannot understand, to be honest. After all I can see the message being in the queue when it's trying to be redelivered, so it should be persisted. It's clearly not, but why?
My own attempts I have tried a number of things, none of which have worked. Always one message gets lost.
<jms:transaction />
tags within the flows - didn't work<cxf:jaxws-client />
<xa-transaction />
- didn't work<default-exception-strategy>
config - If I recall it made things worstAny help is appreciated, thanks.
ACTIVE MQ CONFIGURATION
<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
<spring:property name="queue" value="queue.*"/>
<spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>
<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
<spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&broker.persistent=true&broker.useJmx=true"/>
<spring:property name="redeliveryPolicy">
<spring:bean class="org.apache.activemq.RedeliveryPolicy">
<spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
<spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
<spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
<spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
</spring:bean>
</spring:property>
</spring:bean>
<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
<spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>
<spring:bean name="AmqBroker"
class="org.apache.activemq.broker.BrokerService"
init-method="start"
destroy-method="stop">
<spring:property name="brokerName" value="esb-amq-broker"/>
<spring:property name="persistent" value="true"/>
<spring:property name="dataDirectory" value="/home/bachman/activemq"/>
<spring:property name="useJmx" value="true"/>
<spring:property name="useShutdownHook" value="false"/>
<spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
<spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>
<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
disableTemporaryReplyToDestinations="true"/>
FLOW - dispatch incoming message to 3 queues Qn
<flow name="dispatch-to-queues">
<inbound-endpoint ref="incoming-cxf"/>
<!-- Each received message ends up to be sent to all destinations -->
<all>
<jms:outbound-endpoint name="queue.q1"
queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q1"
connector-ref="PersistentJMSConnector"/>
<jms:outbound-endpoint name="queue.q2"
queue="queue.q2" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on q2"
connector-ref="PersistentJMSConnector" />
<jms:outbound-endpoint name="queue.q3"
queue="queue.q3" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Receive messages on Q3"
connector-ref="PersistentJMSConnector" />
</all>
<custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
</flow>
FLOW - handle delivery from Qn to EXn
<flow name="from-q1-to-ex1">
<jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
doc:name="JMS" doc:description="Pull from q1."
connector-ref="PersistentJMSConnector">
<jms:transaction action="ALWAYS_BEGIN" />
</jms:inbound-endpoint>
<logger message="Sending message to EX-1" level="INFO" />
<!-- Handle errors at this point in flow
<custom-processor class="pl.exception.lookup.Component">
<spring:property name="targetModuleName" value="Not-important"/>
</custom-processor>
-->
<outbound-endpoint ref="ex1-cxf-endpoint">
<jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
</outbound-endpoint>
</flow>
ENDPOINTS - declaration of referred endpoints
<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
<cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
</endpoint>
<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
<cxf:jaxws-client
clientClass="com.mycompany.services.Ex1"
wsdlLocation="classpath:wsdl/ex1.wsdl"
operation="someOperation"
port="SomePort"/>
</endpoint>
The parameter destination is a Queue or Topic object that the application has created previously. The application then uses the receive() method of the MessageConsumer object to receive a message from the destination, as shown in the following example: Message inMessage = consumer. receive(1000);
JMS supports two different message delivery models: Point-to-Point (Queue destination): In this model, a message is delivered from a producer to one consumer. The messages are delivered to the destination, which is a queue, and then delivered to one of the consumers registered for the queue.
Consuming JMS messages in a transaction is a must for the solution to work as expected: if an exception occurs in the CXF outbound phase, the JMS message will end-up rolled back, then redelivered, triggering a new CXF call.
You must carefully configure the redelivery policy for your ActiveMQ client in order to retry enough times and maybe not too fast (exponential back-off for example). You also want to handle the DLQ appropriately. ActiveMQ's client configuration with Spring Beans in Mule is shown: http://www.mulesoft.org/mule-activemq-integration-examples
Also be sure to refer to the right broker URL in your configuration factory. With your broker name of esb-amq-broker, your configuration factory should be:
<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
<spring:property name="brokerURL" value="vm://esb-amq-broker"/>
...
I don't know if I will help you much but this is a couple of suggestions regarding your problem:
Good luck HTH jerome
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With