Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Spring threading and TaskExecutor, how do I know when a thread is finished?

Tags:

Alright, possible a naive question here. I have a service that needs to log into multiple network devices, run a command on each and collect the results. For speed, rather than collect the information on each device in sequence, I need to access them all concurrently and consume the results after they are done.

Using the Spring framework and Jsch I'm quite easily able to query each device correctly. Where I am running into some confusion is in trying to rewire the beans to use TaskExecutor to accomplish this. What I can't figure out how to do is how to know when the thread is finished.

What I have so far is this:

public class RemoteCommand {      private String user;     private String host;     private String password;     private String command;     private List<String> commandResults;     private TaskExecutor taskExecutor;      public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) {          setUser(user);         setHost(host);         setPassword(password);         setTaskExecutor(taskExecutor);     }      /**      * @param user the user to set      */     public void setUser(String user) {         this.user = user;     }      /**      * @return the user      */     public String getUser() {         return user;     }      /**      * @param host the host to set      */     public void setHost(String host) {         this.host = host;     }      /**      * @return the host      */     public String getHost() {         return host;     }      /**      * @param password the password to set      */     public void setPassword(String password) {         this.password = password;     }      /**      * @return the password      */     public String getPassword() {         return password;     }      /**      * @param command the command to set      */     private void setCommand(String command) {         this.command = command;     }      /**      * @return the command      */     private String getCommand() {         return command;     }      /**      * @param commandResults the commandResults to set      */     private void setCommandResults(List<String> commandResults) {         this.commandResults = commandResults;     }      /**      * @return the commandResults      */     public List<String> getCommandResults(String command) {         taskExecutor.execute(new CommandTask(command) );          return commandResults;     }      /**      * @param taskExecutor the taskExecutor to set      */     public void setTaskExecutor(TaskExecutor taskExecutor) {         this.taskExecutor = taskExecutor;     }      /**      * @return the taskExecutor      */     public TaskExecutor getTaskExecutor() {         return taskExecutor;     }      private class CommandTask implements Runnable {          public CommandTask(String command) {             setCommand(command);             System.out.println("test: " + getCommand());         }          /**          *           * @param command          */         public void run() {              List<String> results = new LinkedList<String>();             String command = getCommand();              try {                 System.out.println("running");                 JSch jsch = new JSch();                  String user = getUser();                 String host = getHost();                  java.util.Properties config = new java.util.Properties();                  config.put("StrictHostKeyChecking", "no");                  host = host.substring(host.indexOf('@') + 1);                 Session session = jsch.getSession(user, host, 22);                  session.setPassword(getPassword());                 session.setConfig(config);                 session.connect();                  Channel channel = session.openChannel("exec");                 ((ChannelExec) channel).setCommand(command);                  channel.setInputStream(null);                  ((ChannelExec) channel).setErrStream(System.err);                  InputStream in = channel.getInputStream();                  channel.connect();                 byte[] tmp = new byte[1024];                 while (true) {                     while (in.available() > 0) {                         int i = in.read(tmp, 0, 1024);                         if (i < 0)                             break;                         results.add(new String(tmp, 0, i));                         System.out.print(new String(tmp, 0, i));                     }                     if (channel.isClosed()) {                         //System.out.println("exit-status: "                         //      + channel.getExitStatus());                         break;                     }                     try {                         Thread.sleep(1000);                     } catch (Exception ee) {                         ee.printStackTrace();                     }                 }                 channel.disconnect();                 session.disconnect();             } catch (Exception e) {                 System.out.println(e);             }             setCommandResults(results);             System.out.println("finished running");         }     } } 

Within my junit test I have:

@Test     public void testRemoteExecution() {          remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand");         remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");              //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx");         //for (String line : results) {         //  System.out.println(line.trim());         //}     } 

My applicationContext.xml file:

    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">        <property name="corePoolSize" value="5" />        <property name="maxPoolSize" value="10" />        <property name="queueCapacity" value="25" />     </bean>          <!-- ******************** --> <!--      Utilities       --> <!-- ******************** -->       <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype">         <description>Remote Command</description>         <constructor-arg><value>${remote.user}</value></constructor-arg>         <constructor-arg><value>${remote.host}</value></constructor-arg>         <constructor-arg><value>${remote.password}</value></constructor-arg>         <constructor-arg ref="taskExecutor" />     </bean>  

I get as far as the first println in the run() method. Then the test exits cleanly with no errors. I never get to the second println at the bottom of that routine. I've looked at this thread here, which was very useful, but not implemented in a Spring specific fashion. I'm sure I'm missing something simple, or have completely run off the rails here. Any help is appreciated.

like image 242
Bill Avatar asked Feb 15 '10 21:02

Bill


People also ask

How do you know when a thread is finished?

Use Thread. join() in your main thread to wait in a blocking fashion for each Thread to complete, or. Check Thread.

What happens when a thread is finished?

Short Answer: When a thread has finished its process, if nothing else holds a reference to it, the garbage collector will dispose of it automatically.

When using a thread pool What happens to a given thread after it finished its task?

Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.


2 Answers

The TaskExecutor interface is a fire-and-forget interface, for use when you don't care when the task finishes. It's the simplest async abstraction that Spring offers.

There is , however, an enhanced interface, AsyncTaskExecutor, which provides additional methods, including submit() methods that return a Future, which let you wait on the result.

Spring provides the ThreadPoolTaskExecutor class, which implement both TaskExecutor and AsyncTaskExecutor.

In your specific case, I would re-implement the Runnable as a Callable, and return the commandResults from the Callable.call() method. The getCommandResults method can then be reimplemented as:

public List<String> getCommandResults(String command) {    Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command));    return futureResults.get(); } 

This method will submit the task asynchronously, and then wait for it to complete before returning the results returned from the Callable.call() method. This also lets you get rid of the commandResults field.

like image 119
skaffman Avatar answered Oct 21 '22 06:10

skaffman


public List<String> getCommandResults(String command) {     FutureTask task = new FutureTask(new CommandTask(command))     taskExecutor.execute(task);      return task.get(); //or task.get(); return commandResults; - but it not a good practice } 
like image 22
Igor Artamonov Avatar answered Oct 21 '22 07:10

Igor Artamonov