Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Waiting for all threads to finish in Spring Integration

I have a self-executable jar program that relies heavily on Spring Integration. The problem I am having is that the program is terminating before the other Spring beans have completely finished.

Below is a cut-down version of the code I'm using, I can supply more code/configuration if needed. The entry point is a main() method, which bootstraps Spring and starts the import process:

public static void main(String[] args) {
    ctx = new ClassPathXmlApplicationContext("flow.xml");
    DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
    try {
        importer.startImport();
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        ctx.close();
    }
}

The DataImporter contains a simple loop that fires messages to a Spring Integration gateway. This delivers an active "push" approach to the flow, rather than the common approach of polling for data. This is where my problem comes in:

public void startImport() throws Exception {
    for (Item item : items) {
        gatewayBean.publish(item);
        Thread.sleep(200); // Yield period
    }
}

For completeness, the flow XML looks something like this:

<gateway default-request-channel="inChannel" service-interface="GatewayBean" />

<splitter input-channel="inChannel" output-channel="splitChannel" />

<payload-type-router input-channel="splitChannel">
    <mapping type="Item" channel="itemChannel" />
    <mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>

<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />

The flow starts and processes items effectively, but once the startImport() loop finishes the main thread terminates and tears down all the Spring Integration threads immediately. This results in a race condition, the last (n) items are not completely processed when the program terminates.

I have an idea of maintaining a reference count of the items I am processing, but this is proving to be quite complicated, since the flow often splits/routes the messages to multiple service activators - meaning it is difficult to determine if each item has "finished".

What I think I need is some way to either check that no Spring beans are still executing, or to flag that all items sent to the gateway have been completely processed before terminating.

My question is, how might I go about doing either of these, or is there a better approach to my problem I haven't thought of?

like image 327
seanhodges Avatar asked Jan 19 '12 10:01

seanhodges


People also ask

What is the method to be called to make main thread wait until the completion of other threads?

The wait() is used in with notify() and notifyAll() methods, but join() is used in Java to wait until one thread finishes its execution. wait() is mainly used for shared resources, a thread notifies other waiting thread when a resource becomes free. On the other hand join() is used for waiting a thread to die.

Can a Thread object be used to launch multiple threads?

Because Java threads run in the same memory space, they can easily communicate among themselves because an object in one thread can call a method in another thread without any overhead from the operating system. In this Tutorial we will learn how to do multi-threaded programming in Java.

How does Spring integration work?

Spring Integration enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring's support for remoting, messaging, and scheduling.


2 Answers

You're not using a request-response pattern here.

outbound-channel-adapter is a fire and forget action, if you want to wait for the response you should use an outbound-gateway that will wait for response, and connect the response to the original gateway, then in java sendAndReceive not just publish.

like image 197
Víctor Romero Avatar answered Oct 06 '22 18:10

Víctor Romero


If you can get an Item to determine, whether it is still needed or not (processingFinished() or something similar executed in the back-end-stages), you can register all Items at a central authority, which keeps track of the number of non-finished Items and effecitvely determines a termination-condition.

If this approach is feasible, you could even think of packaging the items into FutureTask-objects or make use of similar concepts from java.util.concurrent.

Edit: Second Idea:

Have you thought about making the channels more intelligent? A sender closes the channel once it does not send any more data. In this scenario, the worker-beans do not have to be deamon threads but can determine their termination-criterion based on a closed and empty input channel.

like image 38
Jonathan Avatar answered Oct 06 '22 16:10

Jonathan