I have a spring integration flow that involves async execution, returning value from gateway to controller, continuing integration flow after returning a value.
Here is the gateway:
@MessagingGateway
public interface GW {
@Gateway(requestChannel = "f.input")
Task input(Collection<MessengerIncomingRequest> messages);
}
And here is the flow:
@Bean
IntegrationFlow jFlow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
ThreadPoolTaskExecutor executor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
...
return pool;
}
@Bean
MessageChannel routerChannel() {
return MessageChannels
.publishSubscribe("routerChannel", executor())
.get();
}
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process()))
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@Bean
IntegrationFlow createFlow() {
return IntegrationFlows.from(
MessageChannels.direct("createChannel"))
.handle(routerService)
.get();
}
How can I define an error handler for the whole flow? What are the best practices? I know I can put a try/catch block for the gateway method call, but it will only catch exceptions that occur in jFlow
flow for everything that comes before channel(routerChannel())
.
How can I handle errors for the rest of the flow? Or for the entire flow?
UPDATE
I added error handler for publishSubscribeChannel
@Bean
IntegrationFlow routerChannelFlow() {
return IntegrationFlows
.from(routerChannel())
.publishSubscribeChannel(s -> s
.subscribe(f -> f.bridge(null))
.subscribe(process())
.errorHandler(errorHandler))
.get();
}
but it does not seem to help, because in case of exception I get the following error:
cMessagingTemplate$TemporaryReplyChannel : Reply message received but the receiving thread has already received a reply:ErrorMessage [payload=org.springframework.messaging.MessageHandlingException:
and my error handler does not get called.
UPDATE
According to Gary's answer I tried this code:
@Bean
IntegrationFlow jFLow() {
return IntegrationFlows.from(
MessageChannels.executor("f.input", executor()))
.split()
.channel(MessageChannels.executor(executor()))
.transform(transformer)
.channel(routerChannel())
.get();
}
@Bean
IntegrationFlow exceptionOrErrorFlow() {
return IntegrationFlows.from(
MessageChannels.direct("exceptionChannel"))
.handle(errorHandler, "handleError")
.get();
}
@Bean
MessageChannel exceptionChannel() {
return MessageChannels.direct("exceptionChannel")
.get();
}
@Bean
IntegrationFlow process() {
return f ->
f.enrichHeaders((spec) ->
spec.header("errorChannel", "exceptionChannel", true))
f.route(p -> p.getKind().name(),
m -> m.suffix("Channel")
.channelMapping(TaskKind.CREATE.name(), "create")
....
}
@MessagingGateway(errorChannel = "exceptionChannel")
After another edit I added exceptionChannel
to the gateway, and moved enriching header to the second leg (async) of my flow. Still controller gets blocked if exception is throw in the synchronous part of the flow.
First of all, let me explain how the gateway works - it should help with understanding the solution below.
The request message gets a unique temporary reply channel which is added as the replyChannel
header. Even if the gateway has an explicit replyChannel
, that is simply bridged to the request's replyChannel
- that's how the gateway correlates the reply to the request.
Now, the gateway also sets the request's errorChannel
header to the same reply channel. That way, even if the flow is asynchronous, an exception can be routed back to the gateway and either thrown to the caller or routed to the gateway's error channel (if specified). This routing is performed by a MessagePublishingErrorHandler
which is wired into a ErrorHandlingTaskExecutor
, which wraps your executor.
Since you are returning a result to the gateway and then continuing; that gateway interaction is "spent" and nothing will ever receive a message (including an exception) sent to the replyChannel
header. Hence the log message you are seeing.
So, one solution is to fix up the errorChannel
header on the message sent to the independent flow. Use .enrichHeaders
to replace (be sure to set overwrite to true) the errorChannel
header that was set up by the gateway. This should be done as soon as possible in the flow so any exceptions will be routed to that channel (and then you can subscribe your error handler there).
An alternative solution is to wire up your own error handling executor, explicitly setting a defaultErrorChannel
on its MessagePublishingErrorHandler
and remove the errorChannel
header.
The async error routing first looks for a header; if present, the error message is routed there; if there's no header and the MPEH has no default error channel; the message will be routed to the default errorChannel
to which (normally) there is a LoggingChannelAdapter
subscribed. The default errorChannel
is a pub/sub channel so you can subscribe other endpoints to it.
EDIT
You are changing the channel before the pub/sub.
It's important to get at least one response to the gateway; you should leave the error channel alone on one leg of the pub/sub and update it on the second leg. That way, an exception on the first leg will be thrown to the caller (you can add an errorChannel
to the gateway if you want to take some action there, such as routing to your exception handler). You must only update the header on the second leg so that its exceptions go straight to your error handler.
If you set the errorChannel
on the gateway to your exceptionChannel
then exceptions on both legs will go there.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With