Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

TaskExecutor is not working Spring Integration

I have setup File poller with task executor

ExecutorService executorService = Executors.newFixedThreadPool(10);

            LOG.info("Setting up the poller for directory {} ", finalDirectory);
            StandardIntegrationFlow standardIntegrationFlow = IntegrationFlows.from(new CustomFileReadingSource(finalDirectory),
                    c -> c.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS, 5)
                            .taskExecutor(executorService)
                            .maxMessagesPerPoll(10)
                            .advice(new LoggerSourceAdvisor(finalDirectory))
                    ))


                    //move file to processing first processing                    
                    .transform(new FileMoveTransformer("C:/processing", true))
                    .channel("fileRouter")
                    .get();

As seen I have setup fixed threadpool of 10 and maximum message 10 per poll. If I put 10 files it still processes one by one. What could be wrong here ?

* UPDATE *

It works perfectly fine after Gary's answer though I have other issue now.

I have setup my Poller like this

setDirectory(new File(path));
        DefaultDirectoryScanner scanner = new DefaultDirectoryScanner();

        scanner.setFilter(new AcceptAllFileListFilter<>());
        setScanner(scanner);

The reason of using AcceptAll because the same file may come again that's why I sort of move the file first. But when I enable the thread executor the same file is being processed by mutliple threads, I assume because of AcceptAllFile

If I Change to AcceptOnceFileListFilter it works but then the same file that comes again will not be picked up again ! What can be done to avoid this issue ?

Issue/Bug

In Class AbstractPersistentAcceptOnceFileListFilter We have this code

@Override
    public boolean accept(F file) {
        String key = buildKey(file);
        synchronized (this.monitor) {
            String newValue = value(file);
            String oldValue = this.store.putIfAbsent(key, newValue);
            if (oldValue == null) { // not in store
                flushIfNeeded();
                return true;
            }
            // same value in store
            if (!isEqual(file, oldValue) && this.store.replace(key, oldValue, newValue)) {
                flushIfNeeded();
                return true;
            }
            return false;
        }
    }

Now for example if I have setup max per poll 5 and there are two files then its possible same file would be picked up by two threads.

Lets say my code moves the files once I read it.

But the other thread gets to the accept method

if the file is not there then it will return lastModified time as 0 and it will return true.

That causes the issue because the file is NOT there.

If its 0 then it should return false as the file is not there anymore.

like image 621
Makky Avatar asked Nov 28 '18 14:11

Makky


People also ask

What is TaskExecutor in Spring Integration?

The TaskExecutor was originally created to give other Spring components an abstraction for thread pooling where needed. Components such as the ApplicationEventMulticaster , JMS's AbstractMessageListenerContainer , and Quartz integration all use the TaskExecutor abstraction to pool threads.

What is TaskExecutor?

Interface TaskExecutorSimple task executor interface that abstracts the execution of a Runnable . Implementations can use all sorts of different execution strategies, such as: synchronous, asynchronous, using a thread pool, and more.

Does spring use thread pool?

Spring also features implementations of those interfaces that support thread pools or delegation to CommonJ within an application server environment.


1 Answers

When you add a task executor to a poller; all that does is the scheduler thread hands the poll task off to a thread in the thread pool; the maxMessagesPerPoll is part of the poll task. The poller itself only runs once every 5 seconds. To get what you want, you should add an executor channel to the flow...

@SpringBootApplication
public class So53521593Application {

    private static final Logger logger = LoggerFactory.getLogger(So53521593Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So53521593Application.class, args);
    }

    @Bean
    public IntegrationFlow flow() {
        ExecutorService exec = Executors.newFixedThreadPool(10);
        return IntegrationFlows.from(() -> "foo", e -> e
                    .poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
                .channel(MessageChannels.executor(exec))
                .<String>handle((p, h) -> {
                    try {
                        logger.info(p);
                        Thread.sleep(10_000);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                    return null;
                })
                .get();
    }
}

EDIT

It works fine for me...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .handle((p, h) -> {
                try {
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 11:46:05.196 INFO 57607 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 11:46:05.197 INFO 57607 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

and with touch test1.txt

2018-11-28 11:48:00.284 INFO 57607 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

EDIT1

Agreed - reproduced with this...

@Bean
public IntegrationFlow flow() {
    ExecutorService exec = Executors.newFixedThreadPool(10);
    return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/foo")).filter(
                new FileSystemPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "foo")),
                    e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)
                            .maxMessagesPerPoll(10)))
            .channel(MessageChannels.executor(exec))
            .<File>handle((p, h) -> {
                try {
                    p.delete();
                    logger.info(p.toString());
                    Thread.sleep(10_000);
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                }
                return null;
            })
            .get();
}

and

2018-11-28 13:22:23.689 INFO 75681 --- [pool-1-thread-1] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-2] com.example.So53521593Application : /tmp/foo/test2.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-3] com.example.So53521593Application : /tmp/foo/test1.txt

2018-11-28 13:22:23.690 INFO 75681 --- [pool-1-thread-4] com.example.So53521593Application : /tmp/foo/test2.txt

like image 56
Gary Russell Avatar answered Sep 19 '22 10:09

Gary Russell