Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make sure that message from JMS queue is delivered to external WebService (CXF)?

The question

How one should configure ActiveMQ and <flow> in Mule ESB 3.2, in order to make sure that message pulled from queue ends up properly handled by external CXF service?

Scenario

I have an CXF endpoint, which should take incoming message and transfer it to three external services as soon as possible. Let's call them EX1, EX2, EX3. This is fairly easy, thanks to the <all> component introduced in Mule 3.x.

The most important requirement of the whole solution, is to make sure that each received message ends up being delivered to all three CXF services. So we ended up with the idea, to put each incoming message into Persistent JMS queues (Q1, Q2, Q3). After message is being read from queue Qn, it's transfered directly to corresponding EXn endpoint, and thus - external service.

Config

(I can provide full config upon request)

We have configured ActiveMQ broker as described here and wired it up with our <flow> config. Everything seems to work as expected, I have JConsole connected to my application so I can see that messages are of type PERSISTENT and they end up in proper queues. If everything goes smoothly - messages are received by all three services EXn.

Tests

The problem arrises when we turn off one of the services, let's say EX2, and restart the whole server simulating failure. The message ends up being lost (I guess it's not that persistent, huh?). The most curious thing is - If we sent 10 messages when the EX2 is down, after server restart 9 of them are being properly redelivered! So I'm thinking that maybe, just maybe, 9 of those 10 messages were properly enqueued, while the one was being constantly redelivered when the server failed down.

This causes me to think, that CXF endpoint is not being handled with transaction support, which I cannot understand, to be honest. After all I can see the message being in the queue when it's trying to be redelivered, so it should be persisted. It's clearly not, but why?

My own attempts I have tried a number of things, none of which have worked. Always one message gets lost.

  1. Not to use any <jms:transaction /> tags within the flows - didn't work
  2. Starting jms transaction upon message receive, joining while sending to <cxf:jaxws-client />
  3. Using XA with JBoss and <xa-transaction /> - didn't work
  4. Providing <default-exception-strategy> config - If I recall it made things worst

Any help is appreciated, thanks.

CONFIG

ACTIVE MQ CONFIGURATION

<spring:bean id="AmqDefaultPolicyEntry" class="org.apache.activemq.broker.region.policy.PolicyEntry">
    <spring:property name="queue" value="queue.*"/>
    <spring:property name="deadLetterStrategy" ref="AmqDeadLetterStrategy"/>
</spring:bean>

<spring:bean id="AmqPolicyMap" class="org.apache.activemq.broker.region.policy.PolicyMap">
    <spring:property name="defaultEntry" ref="AmqDefaultPolicyEntry"/>
</spring:bean>

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://localhost?jms.prefetchPolicy.all=1&amp;broker.persistent=true&amp;broker.useJmx=true"/>
    <spring:property name="redeliveryPolicy">
        <spring:bean class="org.apache.activemq.RedeliveryPolicy">
            <spring:property name="initialRedeliveryDelay" value="${props.initialRedeliveryDelay}"/>
            <spring:property name="redeliveryDelay" value="${props.redeliveryDelay}"/>
            <spring:property name="maximumRedeliveries" value="${props.maximumRedeliveries}"/>
            <spring:property name="backOffMultiplier" value="${props.backOffMultiplier}"/>
        </spring:bean>
    </spring:property>
</spring:bean>

<spring:bean name="persistenceAdapter" class="org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter">
    <spring:property name="directory" value="/home/bachman/activemq"/>
</spring:bean>

<spring:bean name="AmqBroker"
             class="org.apache.activemq.broker.BrokerService"
             init-method="start"
             destroy-method="stop">
    <spring:property name="brokerName" value="esb-amq-broker"/>
    <spring:property name="persistent" value="true"/>
    <spring:property name="dataDirectory" value="/home/bachman/activemq"/>
    <spring:property name="useJmx" value="true"/>
    <spring:property name="useShutdownHook" value="false"/>
    <spring:property name="persistenceAdapter" ref="persistenceAdapter"/>
    <spring:property name="destinationPolicy" ref="AmqPolicyMap"/>
</spring:bean>

<jms:activemq-connector name="PersistentJMSConnector" specification="1.1"
                        numberOfConsumers="1" maxRedelivery="-1" persistentDelivery="true"
                        connectionFactory-ref="connectionFactory" acknowledgementMode="CLIENT_ACKNOWLEDGE"
                        disableTemporaryReplyToDestinations="true"/>

FLOW - dispatch incoming message to 3 queues Qn

<flow name="dispatch-to-queues">
        <inbound-endpoint ref="incoming-cxf"/>

        <!-- Each received message ends up to be sent to all destinations -->
        <all>
            <jms:outbound-endpoint name="queue.q1"
                queue="queue.q1" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q1"
                    connector-ref="PersistentJMSConnector"/>

            <jms:outbound-endpoint name="queue.q2"
                queue="queue.q2" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on q2"
                connector-ref="PersistentJMSConnector" />

            <jms:outbound-endpoint name="queue.q3"
                queue="queue.q3" disableTransportTransformer="false"
                disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
                doc:name="JMS" doc:description="Receive messages on Q3"
                connector-ref="PersistentJMSConnector" />

        </all>
        <custom-processor class="com.mycompany.just.a.component.to.return.OK.via.Cxf" />
    </flow>

FLOW - handle delivery from Qn to EXn

<flow name="from-q1-to-ex1">
        <jms:inbound-endpoint queue="queue.q1" disableTransportTransformer="false"
            disableTemporaryReplyToDestinations="false" exchange-pattern="one-way"
            doc:name="JMS" doc:description="Pull from q1."
            connector-ref="PersistentJMSConnector">
                <jms:transaction action="ALWAYS_BEGIN" />
        </jms:inbound-endpoint>
        <logger message="Sending message to EX-1" level="INFO" />

        <!-- Handle errors at this point in flow
        <custom-processor class="pl.exception.lookup.Component">
            <spring:property name="targetModuleName" value="Not-important"/>
        </custom-processor>
        -->


        <outbound-endpoint ref="ex1-cxf-endpoint">
            <jms:transaction action="ALWAYS_JOIN" timeout="360000"/>
        </outbound-endpoint>
    </flow>

ENDPOINTS - declaration of referred endpoints

<endpoint name="incoming-cxf" address="http://incoming.mycompany.com/in" exchange-pattern="request-response">
        <cxf:jaxws-service serviceClass="com.mycompany.services.InService"/>
    </endpoint> 

<endpoint name="ex1-cxf-endpoint" address="http://com.mycompany.ex1" exchange-pattern="request-response">
        <cxf:jaxws-client
                clientClass="com.mycompany.services.Ex1"
                wsdlLocation="classpath:wsdl/ex1.wsdl"
                operation="someOperation"
                port="SomePort"/>
    </endpoint>
like image 298
ŁukaszBachman Avatar asked Jan 24 '12 14:01

ŁukaszBachman


People also ask

How do I get messages from JMS queue?

The parameter destination is a Queue or Topic object that the application has created previously. The application then uses the receive() method of the MessageConsumer object to receive a message from the destination, as shown in the following example: Message inMessage = consumer. receive(1000);

How does JMS queue work?

JMS supports two different message delivery models: Point-to-Point (Queue destination): In this model, a message is delivered from a producer to one consumer. The messages are delivered to the destination, which is a queue, and then delivered to one of the consumers registered for the queue.


2 Answers

Consuming JMS messages in a transaction is a must for the solution to work as expected: if an exception occurs in the CXF outbound phase, the JMS message will end-up rolled back, then redelivered, triggering a new CXF call.

You must carefully configure the redelivery policy for your ActiveMQ client in order to retry enough times and maybe not too fast (exponential back-off for example). You also want to handle the DLQ appropriately. ActiveMQ's client configuration with Spring Beans in Mule is shown: http://www.mulesoft.org/mule-activemq-integration-examples

Also be sure to refer to the right broker URL in your configuration factory. With your broker name of esb-amq-broker, your configuration factory should be:

<spring:bean name="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory" depends-on="AmqBroker">
    <spring:property name="brokerURL" value="vm://esb-amq-broker"/>
    ...
like image 169
David Dossot Avatar answered Nov 15 '22 22:11

David Dossot


I don't know if I will help you much but this is a couple of suggestions regarding your problem:

  • have you tried to use another transaction manager than the one provided with Jboss, I would suggest to use Atomikos for such tests
  • like David suggested Transactions seem to be the best approach , but another approach would be to use explicit acknowledgment policy .... It may be tricky to set up but an interceptor like approach could watch for connections to some specific endpoints and send the ack back to your JMS server, difficult may be but it would definitely ensure that the message has been correctly delivered ....

Good luck HTH jerome

like image 43
romje Avatar answered Nov 15 '22 22:11

romje