Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error handling practices in spring integration flow

I have a spring integration flow that involves async execution, returning value from gateway to controller, continuing integration flow after returning a value.

Here is the gateway:

@MessagingGateway
public interface GW {

    @Gateway(requestChannel = "f.input")
    Task input(Collection<MessengerIncomingRequest> messages);

}

And here is the flow:

@Bean
IntegrationFlow jFlow() {
        return IntegrationFlows.from(
        MessageChannels.executor("f.input", executor()))
        .split()
        .channel(MessageChannels.executor(executor()))
        .transform(transformer)
        .channel(routerChannel())
        .get();
}

@Bean
ThreadPoolTaskExecutor executor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        ...
        return pool;
}

@Bean
MessageChannel routerChannel() {
        return MessageChannels
        .publishSubscribe("routerChannel", executor())
        .get();
}

@Bean
IntegrationFlow routerChannelFlow() {
        return IntegrationFlows
        .from(routerChannel())
        .publishSubscribeChannel(s -> s
        .subscribe(f -> f.bridge(null))
        .subscribe(process()))
        .get();
}

@Bean
IntegrationFlow process() {
        return f ->
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....
}

@Bean
IntegrationFlow createFlow() {
        return IntegrationFlows.from(
        MessageChannels.direct("createChannel"))
        .handle(routerService)
        .get();
}

How can I define an error handler for the whole flow? What are the best practices? I know I can put a try/catch block for the gateway method call, but it will only catch exceptions that occur in jFlow flow for everything that comes before channel(routerChannel()).

How can I handle errors for the rest of the flow? Or for the entire flow?

UPDATE

I added error handler for publishSubscribeChannel

@Bean
IntegrationFlow routerChannelFlow() {
    return IntegrationFlows
            .from(routerChannel())
            .publishSubscribeChannel(s -> s
                    .subscribe(f -> f.bridge(null))
                    .subscribe(process())
                    .errorHandler(errorHandler))
            .get();
}

but it does not seem to help, because in case of exception I get the following error:

cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:

and my error handler does not get called.

UPDATE

According to Gary's answer I tried this code:

@Bean
IntegrationFlow jFLow() {
    return IntegrationFlows.from(
            MessageChannels.executor("f.input", executor()))
            .split()
            .channel(MessageChannels.executor(executor()))
            .transform(transformer)
            .channel(routerChannel())
            .get();
}

@Bean
IntegrationFlow exceptionOrErrorFlow() {
    return IntegrationFlows.from(
            MessageChannels.direct("exceptionChannel"))
            .handle(errorHandler, "handleError")
            .get();
}

    @Bean
MessageChannel exceptionChannel() {
    return MessageChannels.direct("exceptionChannel")
            .get();
}

@Bean
IntegrationFlow process() {
        return f ->
        f.enrichHeaders((spec) ->
                    spec.header("errorChannel", "exceptionChannel", true))
        f.route(p -> p.getKind().name(),
        m -> m.suffix("Channel")
        .channelMapping(TaskKind.CREATE.name(), "create")
        ....
}

@MessagingGateway(errorChannel = "exceptionChannel")

After another edit I added exceptionChannel to the gateway, and moved enriching header to the second leg (async) of my flow. Still controller gets blocked if exception is throw in the synchronous part of the flow.

like image 241
Sergei Ledvanov Avatar asked Mar 27 '16 20:03

Sergei Ledvanov


1 Answers

First of all, let me explain how the gateway works - it should help with understanding the solution below.

The request message gets a unique temporary reply channel which is added as the replyChannel header. Even if the gateway has an explicit replyChannel, that is simply bridged to the request's replyChannel - that's how the gateway correlates the reply to the request.

Now, the gateway also sets the request's errorChannel header to the same reply channel. That way, even if the flow is asynchronous, an exception can be routed back to the gateway and either thrown to the caller or routed to the gateway's error channel (if specified). This routing is performed by a MessagePublishingErrorHandler which is wired into a ErrorHandlingTaskExecutor, which wraps your executor.

Since you are returning a result to the gateway and then continuing; that gateway interaction is "spent" and nothing will ever receive a message (including an exception) sent to the replyChannel header. Hence the log message you are seeing.

So, one solution is to fix up the errorChannel header on the message sent to the independent flow. Use .enrichHeaders to replace (be sure to set overwrite to true) the errorChannel header that was set up by the gateway. This should be done as soon as possible in the flow so any exceptions will be routed to that channel (and then you can subscribe your error handler there).

An alternative solution is to wire up your own error handling executor, explicitly setting a defaultErrorChannel on its MessagePublishingErrorHandler and remove the errorChannel header.

The async error routing first looks for a header; if present, the error message is routed there; if there's no header and the MPEH has no default error channel; the message will be routed to the default errorChannel to which (normally) there is a LoggingChannelAdapter subscribed. The default errorChannel is a pub/sub channel so you can subscribe other endpoints to it.

EDIT

You are changing the channel before the pub/sub.

It's important to get at least one response to the gateway; you should leave the error channel alone on one leg of the pub/sub and update it on the second leg. That way, an exception on the first leg will be thrown to the caller (you can add an errorChannel to the gateway if you want to take some action there, such as routing to your exception handler). You must only update the header on the second leg so that its exceptions go straight to your error handler.

If you set the errorChannel on the gateway to your exceptionChannel then exceptions on both legs will go there.

like image 119
Gary Russell Avatar answered Sep 27 '22 18:09

Gary Russell