I am using Spring Integration to read files from a directory using following configuration. However I am looking to stop poller once I found any file until service not restarted again. Is there any way I can change poller delay at runtime OR start/stop Poller at runtime?
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(cron = "0 0/10 19-23,0-6 ? * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
File directory = new File(localFtpDirectory);
if (clearLocalDir && directory.isDirectory() && directory.exists()) {
LOG.info("Clear directory {} on startup of service", directory);
Arrays.stream(directory.listFiles()).forEach(File::delete);
}
source.setDirectory(directory);
source.setFilter(new LastModifiedFileFilter(remoteFileFilter));
return source;
}
@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public MessageHandler fileHandler() {
return new MessageHandlerService();
}
I had similar requirement, but was not satisfied with the accepted answer. The start/stop functionality mentioned here is defined by the Lifecycle/SmartLifecycle
contract, and the typical scenario I can see is application shutdown or application context refresh. Simply, when I naively tried to use this I indeed got into problems very quickly.
My requirement was rather to pause the poller and resume it later on. This can be easily achieved by the advice and is very easy to implement. However, getting this knowledge was not easy and I ended up with framework source code investigation (indeed I might have overlooked something in the doc).
@Bean
public MessagePollingControlAdvice messagePollingControlAdvice() {
return new MessagePollingControlAdvice();
}
//call this method for your DSL bridge configuration etc..
Consumer<GenericEndpointSpec<BridgeHandler>> pollerConfiguration() {
return b -> b.poller(pollerFactory -> pollerFactory.advice(messagePollingControlAdvice()));
}
public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
private volatile boolean pollingActive = false;
@Override
public boolean beforeReceive(Object source) {
return pollingActive;
}
@Override
public Message<?> afterReceive(Message<?> result, Object source) {
return result;
}
public boolean isPollingActive() {
return pollingActive;
}
//call this method from whatever place in your code to activate/deactivate poller
public void setPollingActive(boolean pollingActive) {
this.pollingActive = pollingActive;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With