Alright, possible a naive question here. I have a service that needs to log into multiple network devices, run a command on each and collect the results. For speed, rather than collect the information on each device in sequence, I need to access them all concurrently and consume the results after they are done.
Using the Spring framework and Jsch I'm quite easily able to query each device correctly. Where I am running into some confusion is in trying to rewire the beans to use TaskExecutor to accomplish this. What I can't figure out how to do is how to know when the thread is finished.
What I have so far is this:
public class RemoteCommand { private String user; private String host; private String password; private String command; private List<String> commandResults; private TaskExecutor taskExecutor; public RemoteCommand(String user, String host, String password, TaskExecutor taskExecutor) { setUser(user); setHost(host); setPassword(password); setTaskExecutor(taskExecutor); } /** * @param user the user to set */ public void setUser(String user) { this.user = user; } /** * @return the user */ public String getUser() { return user; } /** * @param host the host to set */ public void setHost(String host) { this.host = host; } /** * @return the host */ public String getHost() { return host; } /** * @param password the password to set */ public void setPassword(String password) { this.password = password; } /** * @return the password */ public String getPassword() { return password; } /** * @param command the command to set */ private void setCommand(String command) { this.command = command; } /** * @return the command */ private String getCommand() { return command; } /** * @param commandResults the commandResults to set */ private void setCommandResults(List<String> commandResults) { this.commandResults = commandResults; } /** * @return the commandResults */ public List<String> getCommandResults(String command) { taskExecutor.execute(new CommandTask(command) ); return commandResults; } /** * @param taskExecutor the taskExecutor to set */ public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } /** * @return the taskExecutor */ public TaskExecutor getTaskExecutor() { return taskExecutor; } private class CommandTask implements Runnable { public CommandTask(String command) { setCommand(command); System.out.println("test: " + getCommand()); } /** * * @param command */ public void run() { List<String> results = new LinkedList<String>(); String command = getCommand(); try { System.out.println("running"); JSch jsch = new JSch(); String user = getUser(); String host = getHost(); java.util.Properties config = new java.util.Properties(); config.put("StrictHostKeyChecking", "no"); host = host.substring(host.indexOf('@') + 1); Session session = jsch.getSession(user, host, 22); session.setPassword(getPassword()); session.setConfig(config); session.connect(); Channel channel = session.openChannel("exec"); ((ChannelExec) channel).setCommand(command); channel.setInputStream(null); ((ChannelExec) channel).setErrStream(System.err); InputStream in = channel.getInputStream(); channel.connect(); byte[] tmp = new byte[1024]; while (true) { while (in.available() > 0) { int i = in.read(tmp, 0, 1024); if (i < 0) break; results.add(new String(tmp, 0, i)); System.out.print(new String(tmp, 0, i)); } if (channel.isClosed()) { //System.out.println("exit-status: " // + channel.getExitStatus()); break; } try { Thread.sleep(1000); } catch (Exception ee) { ee.printStackTrace(); } } channel.disconnect(); session.disconnect(); } catch (Exception e) { System.out.println(e); } setCommandResults(results); System.out.println("finished running"); } } }
Within my junit test I have:
@Test public void testRemoteExecution() { remoteCommand = (RemoteCommand) applicationContext.getBean("remoteCommand"); remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx"); //List<String> results = remoteCommand.getCommandResults("scripts/something.pl xxx.xxx.xxx.xxx"); //for (String line : results) { // System.out.println(line.trim()); //} }
My applicationContext.xml file:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean> <!-- ******************** --> <!-- Utilities --> <!-- ******************** --> <bean name="remoteCommand" class="com.xxx.ncc.sonet.utilities.RemoteCommand" scope="prototype"> <description>Remote Command</description> <constructor-arg><value>${remote.user}</value></constructor-arg> <constructor-arg><value>${remote.host}</value></constructor-arg> <constructor-arg><value>${remote.password}</value></constructor-arg> <constructor-arg ref="taskExecutor" /> </bean>
I get as far as the first println in the run() method. Then the test exits cleanly with no errors. I never get to the second println at the bottom of that routine. I've looked at this thread here, which was very useful, but not implemented in a Spring specific fashion. I'm sure I'm missing something simple, or have completely run off the rails here. Any help is appreciated.
Use Thread. join() in your main thread to wait in a blocking fashion for each Thread to complete, or. Check Thread.
Short Answer: When a thread has finished its process, if nothing else holds a reference to it, the garbage collector will dispose of it automatically.
Once a thread in the thread pool completes its task, it's returned to a queue of waiting threads. From this moment it can be reused. This reuse enables applications to avoid the cost of creating a new thread for each task.
The TaskExecutor
interface is a fire-and-forget interface, for use when you don't care when the task finishes. It's the simplest async abstraction that Spring offers.
There is , however, an enhanced interface, AsyncTaskExecutor
, which provides additional methods, including submit()
methods that return a Future
, which let you wait on the result.
Spring provides the ThreadPoolTaskExecutor
class, which implement both TaskExecutor
and AsyncTaskExecutor
.
In your specific case, I would re-implement the Runnable
as a Callable
, and return the commandResults
from the Callable.call()
method. The getCommandResults
method can then be reimplemented as:
public List<String> getCommandResults(String command) { Future<List<String>> futureResults = taskExecutor.submit(new CommandTask(command)); return futureResults.get(); }
This method will submit the task asynchronously, and then wait for it to complete before returning the results returned from the Callable.call()
method. This also lets you get rid of the commandResults
field.
public List<String> getCommandResults(String command) { FutureTask task = new FutureTask(new CommandTask(command)) taskExecutor.execute(task); return task.get(); //or task.get(); return commandResults; - but it not a good practice }
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With