Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring integration: how to handle exceptions in services after an aggregator?

I have an application relying on Spring Integration (4.0.4.RELEASE) and RabbitMQ. My flow is as follow:

Messages are put in queue via a process (they do not expect any answer): Gateway -> Channel -> RabbitMQ

And then drained by another process:

RabbitMQ --1--> inbound-channel-adapter A --2--> chain B --3--> aggregator C --4--> service-activator D --5--> final service-activator E

Explanations & context

The specific thing is that nowhere in my application I am using a splitter: aggregator C just waits for enough messages to come, or for a timeout to expire, and then forwards the batch to service D. Messages can get stuck in aggregator C for quite a long time, and should NOT be considered as consumed there. They should only be consumed once service D successfully completes. Therefore, I am using MANUAL acknowledgement on inbound-channel-adapter A and service E is in charge of acknowledging the batch.

Custom aggregator

I solved the acknowledgement issue I had when set to AUTO by redefining the aggregator. Indeed, messages are acknowledged immediately if any asynchronous process occurs in the flow (see question here). Therefore, I switched to MANUAL acknowledgement and implemented the aggregator like this:

     <bean class="org.springframework.integration.config.ConsumerEndpointFactoryBean">
        <property name="inputChannel" ref="channel3"/>
        <property name="handler">
            <bean class="org.springframework.integration.aggregator.AggregatingMessageHandler">
                <constructor-arg name="processor">
                    <bean class="com.test.AMQPAggregator"/>
                </constructor-arg>
                <property name="correlationStrategy">
                    <bean class="com.test.AggregatorDefaultCorrelationStrategy" />
                </property>
                <property name="releaseStrategy">
                    <bean class="com.test.AggregatorMongoReleaseStrategy" />
                </property>
                <property name="messageStore" ref="messageStoreBean"/>
                <property name="expireGroupsUponCompletion" value="true"/>
                <property name="sendPartialResultOnExpiry" value="true"/>
                <property name="outputChannel" ref="channel4"/>
            </bean>
        </property>
    </bean>

    <bean id="messageStoreBean" class="org.springframework.integration.store.SimpleMessageStore"/>

    <bean id="messageStoreReaperBean" class="org.springframework.integration.store.MessageGroupStoreReaper">
        <property name="messageGroupStore" ref="messageStore" />
        <property name="timeout" value="${myapp.timeout}" />
    </bean>

    <task:scheduled-tasks>
        <task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
    </task:scheduled-tasks>

I wanted indeed to aggregate the headers in a different way, and keep the highest value of all the amqp_deliveryTag for later multi-acknoledgement in service E (see this thread). This works great so far, apart from the fact that it is far more verbose than the typical aggregator namespace (see this old Jira ticket).

Services

I am just using basic configurations:

chain-B

<int:chain input-channel="channel2" output-channel="channel3">
        <int:header-enricher>
            <int:error-channel ref="errorChannel" /> // Probably useless
        </int:header-enricher>
        <int:json-to-object-transformer/>
        <int:transformer    ref="serviceABean" 
                            method="doThis" />
        <int:transformer    ref="serviceBBean" 
                            method="doThat" />
    </int:chain>

service-D

<int:service-activator  ref="serviceDBean"
                            method="doSomething"
                            input-channel="channel4"
                            output-channel="channel5" />

Error management

As I rely on MANUAL acknowledgement, I need to manually reject messages as well in case an exception occurs. I have the following definition for inbound-channel-adapter A:

<int-amqp:inbound-channel-adapter   channel="channel2"
                                            queue-names="si.queue1"
                                            error-channel="errorChannel"
                                            mapped-request-headers="*"
                                            acknowledge-mode="MANUAL"
                                            prefetch-count="${properties.prefetch_count}"
                                            connection-factory="rabbitConnectionFactory"/>

I use the following definition for errorChannel:

<int:chain input-channel="errorChannel">
            <int:transformer ref="errorUnwrapperBean" method="unwrap" />
            <int:service-activator ref="amqpAcknowledgerBean" method="rejectMessage" />
</int:chain>

ErrorUnwrapper is based on this code and the whole exception detection and message rejection works well until messages reach aggregator C.

Problem

If an exception is raised while processing the messages in service-activator D, then I see this exception but errorChannel does not seem to receive any message, and my ErrorUnwrapper unwrap() method is not called. The tailored stack traces I see when an Exception("ahahah") is thrown are as follow:

2014-09-23 16:41:18,725 ERROR o.s.i.s.SimpleMessageStore:174: Exception in expiry callback
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:78)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:170)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
(...)

Caused by: java.lang.Exception: ahahaha
    at com.myapp.ServiceD.doSomething(ServiceD.java:153)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
(...)

2014-09-23 16:41:18,733 ERROR o.s.s.s.TaskUtils$LoggingErrorHandler:95: Unexpected error occurred in scheduled task.
org.springframework.messaging.MessageHandlingException: java.lang.Exception: ahahaha
(...)

Question

How can one tell the services that process messages coming from such an aggregator to publish errors to errorChannel? I tried to specify in the header via a header-enricher the error-channel with no luck. I am using the default errorChannel definition, but I tried as well to change its name and redefine it. I am clueless here, and even though I found this and that, I have not managed to get it to work. Thanks in advance for your help!

like image 345
Nicolas Avatar asked Nov 10 '22 01:11

Nicolas


1 Answers

As you see by StackTrace your process is started from the MessageGroupStoreReaper Thread, which is initiated from the default ThreadPoolTaskScheduler.

So, you must provide a custom bean for that:

<bean id="scheduler" class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
    <property name="errorHandler">
        <bean class="org.springframework.integration.channel.MessagePublishingErrorHandler">
            <property name="defaultErrorChannel" ref="errorChannel"/>
        </bean>
    </property>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="messageStoreReaperBean" method="run" fixed-rate="2000" />
</task:scheduled-tasks>

However I see the benefits from having the error-channel on the <aggregator>, where we really have several points from different detached Threads, with wich we can't get deal normally.

like image 172
Artem Bilan Avatar answered Dec 21 '22 17:12

Artem Bilan