Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Cloud Stream @ServiceActivator not messaging to errorChannel on exception

I am using spring cloud stream using the spring-cloud-starter-stream-kafka. I have bound my channels to kafka topics as follows in the application.properties:

spring.cloud.stream.bindings.gatewayOutput.destination=received
spring.cloud.stream.bindings.enrichingInput.destination=received
spring.cloud.stream.bindings.enrichingOutput.destination=enriched
spring.cloud.stream.bindings.redeemingInput.destination=enriched
spring.cloud.stream.bindings.redeemingOutput.destination=redeemed
spring.cloud.stream.bindings.fulfillingInput.destination=redeemed
spring.cloud.stream.bindings.error.destination=errors12
spring.cloud.stream.bindings.errorInput.destination=errors12
spring.cloud.stream.bindings.errorOutput.destination=errors12

I am unable to get my program to produce an exception message to the error channel. Surprisingly, it doesnt even seem to try to produce it, even though I am in a different thread (I have a @MessagingGateway that dumps a message into gatewayOutput, and then the rest of the flow happens asynchronously). Here is the definition of my ServiceActivator:

@Named
@Configuration
@EnableBinding(Channels.class)
@EnableIntegration
public class FulfillingServiceImpl extends AbstractBaseService implements
        FulfillingService {

    @Override
    @Audit(value = "annotatedEvent")
    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT, requiresReply = "false")
    public void fulfill(TrivialRedemption redemption) throws Exception {

        logger.error("FULFILLED!!!!!!");

        throw new Exception("test exception");

    }
}

Here is the log produced (I have truncated the full exception). There is no...

  • Complaint about errorChannel not having any subscriber
  • Kafka producer thread logging
2016-05-13 12:13:14 pool-6-thread-1 DEBUG KafkaMessageChannelBinder$ReceivingHandler:115 - org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler@2b461688 received message: GenericMessage [payload=byte[400], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DirectChannel:430 - preSend on channel 'fulfillingInput', message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG ServiceActivatingHandler:115 - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@64bce7ab] (fulfillingServiceImpl.fulfill.serviceActivator.handler) received message: GenericMessage [payload=com.test.system.poc.model.v3.TrivialRedemption@2581ed90[endpoints=[com.test.system.poc.model.v3.Breadcrumb@21be7df8],orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f,systemCategory=DEMO,systemSubCategory=,properties=,monetaryRedemptionAmount=456.78], headers={kafka_offset=17, kafka_messageKey=null, kafka_topic=redeemed, kafka_partitionId=0, kafka_nextOffset=18, contentType=application/x-java-object;type=com.test.system.poc.model.v3.TrivialRedemption}] - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationEvaluationContext' - {}
2016-05-13 12:13:14 pool-6-thread-1 DEBUG DefaultListableBeanFactory:251 - Returning cached instance of singleton bean 'integrationConversionService' - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR FulfillingServiceImpl$$EnhancerBySpringCGLIB$$9dad62:42 - FULFILLED!!!!!! - {}
2016-05-13 12:13:14 pool-6-thread-1 ERROR LoggingErrorHandler:35 - Error while processing: KafkaMessage [Message(magic = 0, attributes = 0, crc = 3373691507, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=400 cap=400]), KafkaMessageMetadata [offset=17, nextOffset=18, Partition[topic='redeemed', id=0]] - {}
...
...
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='enriched', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='redeemed', id=0]@18 - {}
2016-05-13 12:13:14 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE DefaultConnection:126 - Reading from Partition[topic='errors12', id=0]@0 - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferReceive:36 - 40 bytes read. - {}
2016-05-13 12:13:15 kafka-fetch-1 TRACE BoundedByteBufferSend:36 - 60 bytes written. - {}

EDIT: Here is the content of my channels class:

public interface Channels {

    public static final String GATEWAY_OUTPUT = "gatewayOutput";

    public static final String ENRICHING_INPUT = "enrichingInput";
    public static final String ENRICHING_OUTPUT = "enrichingOutput";

    public static final String REDEEMING_INPUT = "redeemingInput";
    public static final String REDEEMING_OUTPUT = "redeemingOutput";

    public static final String FULFILLING_INPUT = "fulfillingInput";
    public static final String FULFILLING_OUTPUT = "fulfillingOutput";

    @Output(GATEWAY_OUTPUT)
    MessageChannel gatewayOutput();

    @Input(ENRICHING_INPUT)
    MessageChannel enrichingInput();

    @Output(ENRICHING_OUTPUT)
    MessageChannel enrichingOutput();

    @Input(REDEEMING_INPUT)
    MessageChannel redeemingInput();

    @Output(REDEEMING_OUTPUT)
    MessageChannel redeemingOutput();

    @Input(FULFILLING_INPUT)
    MessageChannel fulfillingInput();

    @Output(FULFILLING_OUTPUT)
    MessageChannel fulfillingOutput();
like image 211
Louis Alexander Avatar asked Dec 05 '25 07:12

Louis Alexander


1 Answers

You don't show your Channels class, but the binder doesn't know that your "error" channels are "special".

The binder can be configured with retry and to route exceptions to a dead-letter topic; see this PR which is in the 1.0.0.RELEASE.

Alternatively, you can add a "mid-flow" gateway before the service activator - think of it like a "try/catch" block in Java:

@MessageEndpoint
public static class GatewayInvoker {

    @Autowired
    private ErrorHandlingGateway gw;

    @ServiceActivator(inputChannel = Channels.FULFILLING_INPUT)
    public void send(Message<?> message) {
        this.gw.send(message);
    }

}

@Bean
public GatewayInvoker gate() {
    return new GatewayInvoker();
}

@MessagingGateway(defaultRequestChannel = "toService", errorChannel = Channels.ERRORS)
public interface ErrorHandlingGateway {

    void send(Message<?> message);

}

Change your service activator's input channel to toService.

You have to add @IntegrationComponentScan to your configuration class so the framework can detect the @MessagingGateway interface and build a proxy for it.

EDIT

Another alternative just suggested to me would be to add an ExpressionEvaluatingAdvice in your service activator's advice chain.

like image 102
Gary Russell Avatar answered Dec 07 '25 20:12

Gary Russell