I have setup File poller with task executor
ExecutorService executorService = Executors.newFixedThreadPool(10);
LOG.info("Setting up the poller for directory {} ", finalDirectory);
StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
.taskExecutor(executorService)
.maxMessagesPerPoll(10)
.advice(new LoggerSourceAdvisor(finalDirectory))
))
//move file to processing first processing
.transform(new FileMoveTransformer("C:/processing", true))
.channel("fileRouter")
.get();
As seen I have setup fixed threadpool
of 10 and maximum message 10 per poll. If I put 10 files it still processes one by one. What could be wrong here ?
* UPDATE *
It works perfectly fine after Gary's answer though I have other issue now.
I have setup my Poller like this
setDirectory(new File(path));
DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();
scanner.setFilter(new AcceptAllFileListFilter<>());
setScanner(scanner);
The reason of using AcceptAll
because the same file may come again that's why I sort of move the file first. But when I enable the thread executor the same file is being processed by mutliple threads, I assume because of AcceptAllFile
If I Change to AcceptOnceFileListFilter
it works but then the same file that comes again will not be picked up again ! What can be done to avoid this issue ?
Issue/Bug
In Class AbstractPersistentAcceptOnceFileListFilter
We have this code
@Override
public boolean accept(F file) {
String key = buildKey(file);
synchronized (this.monitor) {
String newValue = value(file);
String oldValue = this.store.putIfAbsent(key, newValue);
if (oldValue == null) { // not in store
flushIfNeeded();
return true;
}
// same value in store
if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
flushIfNeeded();
return true;
}
return false;
}
}
Now for example if I have setup max per poll 5 and there are two files then its possible same file would be picked up by two threads.
Lets say my code moves the files once I read it.
But the other thread gets to the accept
method
if the file is not there then it will return lastModified time as 0 and it will return true.
That causes the issue because the file is NOT there.
If its 0 then it should return false as the file is not there anymore.
The TaskExecutor was originally created to give other Spring components an abstraction for thread pooling where needed. Components such as the ApplicationEventMulticaster , JMS's AbstractMessageListenerContainer , and Quartz integration all use the TaskExecutor abstraction to pool threads.
Interface TaskExecutorSimple task executor interface that abstracts the execution of a Runnable . Implementations can use all sorts of different execution strategies, such as: synchronous, asynchronous, using a thread pool, and more.
Spring also features implementations of those interfaces that support thread pools or delegation to CommonJ within an application server environment.
When you add a task executor to a poller; all that does is the scheduler thread hands the poll task off to a thread in the thread pool; the maxMessagesPerPoll
is part of the poll task. The poller itself only runs once every 5 seconds. To get what you want, you should add an executor channel to the flow...
@SpringBootApplication
public class So53521593Application {
private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);
public static void main(String[] args) {
SpringApplication.run(So53521593Application.class, args);
}
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(() -> "foo", e -> e
.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<String>handle((p, h) -> {
try {
logger.info(p);
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
}
EDIT
It works fine for me...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.handle((p, h) -> {
try {
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
and
2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
and with touch test1.txt
2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
EDIT1
Agreed - reproduced with this...
@Bean
public IntegrationFlow flow() {
ExecutorService exec = Executors.newFixedThreadPool(10);
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
.maxMessagesPerPoll(10)))
.channel(MessageChannels.executor(exec))
.<File>handle((p, h) -> {
try {
p.delete();
logger.info(p.toString());
Thread.sleep(10_000);
}
catch (InterruptedException e1) {
Thread.currentThread().interrupt();
}
return null;
})
.get();
}
and
2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt
2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With