Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to process in parallel and synchronously in spring integration?

Is it possible in Spring integration to keep the channels synchronous (get acknowledgement after sending the message) but process more messages at the same time (process in parallel) without creating own code with threads (i.e. ExecutorService execute and submit worker) and awaiting on them? I would like to upload files through FTP using but uploading more at the same time without creating own threads in the code. I need to know when all files are uploaded (that is why I want it to be synchronous). Is it possible via Spring integration configuration and, if so, how?

like image 962
m.a.tomsik Avatar asked Sep 30 '22 22:09

m.a.tomsik


2 Answers

Well, looks like you need some flow like:

  1. <gateway> to send files to the channel and wait some result as acknowledgement

  2. <splitter> to an ExecutorChannel to process each file in parallel

  3. <int-ftp:outbound-gateway> to upload each file

  4. <aggregator> to correlate and group results of <int-ftp:outbound-gateway>

  5. <aggregator> should send its result to the <gateway>, which is waitng at that time.

Let me know, if something isn't clear.

UPDATE

How to do this in Spring Integration Java DSL any examples?

Something like this:

@Configuration
@EnableIntegration
@IntegrationComponentScan
public class Configuration {

    @Bean
    public IntegrationFlow uploadFiles() {
        return f ->
                   f.split()
                       .handle(Ftp.outboundGateway(this.ftpSessionFactory,
                           AbstractRemoteFileOutboundGateway.Command.PUT, "'remoteDirectory'"))
                       .aggregate();
    }

}

@MessagingGateway(defaultRequestChannel = "uploadFiles.input") 
interface FtpUploadGateway {

    List<String> upload(List<File> filesToUpload);

}
like image 100
Artem Bilan Avatar answered Oct 07 '22 21:10

Artem Bilan


This is very much possible in Spring by making use of @Async task processing.

First create a service which will perform the task asynchronously. Here make note of the @Async annotation, on performTask method, which will be scanned and marked by spring for asynchronous execution.

import java.util.concurrent.Future;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

@Service
public class AsyncTask {

    @Async
    public Future<Result> performTask(String someArgument) {
        // put the business logic here and collect the result below
        Result result = new Result(); // this is some custom bean holding your result
        return new AsyncResult<Result>(result);
    }
}

Next create a component (optional - can be from any other existing service as well) which will invoke the above service.

import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class AsyncClass {

    @Autowired
    private AsyncTask asyncTask;

    public void doAsyncOperation() throws Exception {

    List<Future<Result>> futures = new ArrayList<Future<Result>>();

    for (int i = 1; i < 10; i++) {
        // Simulate multiple calls
        Future<Result > future = doAsync(String.valueOf(i));            
        futures.add(future);
    }

    for (Future<Result > future : futures) {
            // fetch the result
            Result result = future.get();
            // process the result
    }
}

    private Future<Result> doAsync(final String someArgument) {

        // this will immediately return with a placeholder Future object which
        // can be used later to fetch the result
        Future<Result> future = asyncTask.performAsync(someArgument);
        return future;
    }
}

The sample xml configuration required to enable async is as below (For annotation based config use @EnableAsync)

<task:annotation-driven executor="myExecutor" />
<task:executor id="myExecutor" pool-size="30" rejection-policy="CALLER_RUNS"/>

For detailed documentation refer here

like image 28
Bond - Java Bond Avatar answered Oct 07 '22 21:10

Bond - Java Bond