I am using Paramiko to monitor logs on remote machines during a test run.
The monitor happens in a daemon thread which pretty much does this:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
transport = ssh.get_transport()
channel = transport.open_session()
channel.exec_command('sudo tail -f ' + self.logfile)
last_partial = ''
while not self.stopped.isSet():
try:
if None == select or None == channel:
break
rl, wl, xl = select.select([channel], [], [], 1.0)
if None == rl:
break
if len(rl) > 0:
# Must be stdout, how can I check?
line = channel.recv(1024)
else:
time.sleep(1.0)
continue
except:
break
if line:
#handle saving the line... lines are 'merged' so that one log is made from all the sources
ssh.close()
I had problems with blocking reads so I started doing things this way and most of the time it works nicely. I think that I run into problems when there is network slowness.
Sometimes I see this error at the end of a run (after self.stopped above is set). I've tried sleeping after setting stopped and joining all the monitor threads but the hang can still happen.
Exception in thread Thread-9 (most likely raised during interpreter shutdown):
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
File "/usr/lib/python2.6/site-packages/paramiko/transport.py", line 1470, in run
<type 'exceptions.AttributeError'>: 'NoneType' object has no attribute 'error'
In transport.py of Paramiko, I think this is where the error is. Look for #<<<<<<<<<<<<<<<<<<<<<<<<<<< below.
self._channel_handler_table[ptype](chan, m)
elif chanid in self.channels_seen:
self._log(DEBUG, 'Ignoring message for dead channel %d' % chanid)
else:
self._log(ERROR, 'Channel request for unknown channel %d' % chanid)
self.active = False
self.packetizer.close()
elif (self.auth_handler is not None) and (ptype in self.auth_handler._handler_table):
self.auth_handler._handler_table[ptype](self.auth_handler, m)
else:
self._log(WARNING, 'Oops, unhandled type %d' % ptype)
msg = Message()
msg.add_byte(cMSG_UNIMPLEMENTED)
msg.add_int(m.seqno)
self._send_message(msg)
except SSHException as e:
self._log(ERROR, 'Exception: ' + str(e))
self._log(ERROR, util.tb_strings()) #<<<<<<<<<<<<<<<<<<<<<<<<<<< line 1470
self.saved_exception = e
except EOFError as e:
self._log(DEBUG, 'EOF in transport thread')
#self._log(DEBUG, util.tb_strings())
self.saved_exception = e
except socket.error as e:
if type(e.args) is tuple:
if e.args:
emsg = '%s (%d)' % (e.args[1], e.args[0])
else: # empty tuple, e.g. socket.timeout
emsg = str(e) or repr(e)
else:
emsg = e.args
self._log(ERROR, 'Socket exception: ' + emsg)
self.saved_exception = e
except Exception as e:
self._log(ERROR, 'Unknown exception: ' + str(e))
self._log(ERROR, util.tb_strings())
When a run is stuck, I can run >>>>> sudo lsof -i -n | egrep '\' to see that there are indeed stuck ssh connections (indefinitely stuck). My main test process is PID 15010.
sshd 6478 root 3u IPv4 46405 0t0 TCP *:ssh (LISTEN)
sshd 6478 root 4u IPv6 46407 0t0 TCP *:ssh (LISTEN)
sshd 14559 root 3r IPv4 3287615 0t0 TCP 172.16.0.171:ssh- >10.42.80.100:59913 (ESTABLISHED)
sshd 14563 cmead 3u IPv4 3287615 0t0 TCP 172.16.0.171:ssh->10.42.80.100:59913 (ESTABLISHED)
python 15010 root 12u IPv4 3291525 0t0 TCP 172.16.0.171:43227->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 15u IPv4 3291542 0t0 TCP 172.16.0.171:41928->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 16u IPv4 3291784 0t0 TCP 172.16.0.171:57682->172.16.0.48:ssh (ESTABLISHED)
python 15010 root 17u IPv4 3291779 0t0 TCP 172.16.0.171:43246->172.16.0.142:ssh (ESTABLISHED)
python 15010 root 20u IPv4 3291789 0t0 TCP 172.16.0.171:41949->172.16.0.227:ssh (ESTABLISHED)
python 15010 root 65u IPv4 3292014 0t0 TCP 172.16.0.171:51886->172.16.0.226:ssh (ESTABLISHED)
sshd 15106 root 3r IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
sshd 15110 cmead 3u IPv4 3292962 0t0 TCP 172.16.0.171:ssh->10.42.80.100:60540 (ESTABLISHED)
So, I really just want my process not to hang. Oh, and I don't want to update Paramiko if it will require updating Python past 2.6.6 because I on centos and from what I have read going past 2.6.6 can be 'complicated'.
Thanks for any ideas.
Comment to shavenwarthog that is too long for comments:
Hi, thanks for the answer. I have a couple quick questions. 1) what if i need to stop the threads at an unknown time? in other words, the tail -f blah.log threads will run for maybe 3 minutes and i want to maybe check the accumulated data 10 times in those three minutes? 2) kind of the same, I guess, when i tried this with some actual remote machines it wouldn't exit (since tail -f never exits). I had forgotten about this but I think the non-blocking read was to solve this. Do you think the other thread you commented on plus this one is enough to make this work? Basically use my non blocking read to gather data local to each runner thread. Then i would only need to lock when the main thread wants data from each runner, which seems like it would distribute my one lock to say 10 locks and that would help. Does that make sense?
Paramiko does not itself leverage OpenSSH-style config file directives, but it does implement a parser for the format, which users can honor themselves (and is used by higher-level libraries, such as Fabric).
A Paramiko SSH Example: Connect to Your Server Using a Password. This section shows you how to authenticate to a remote server with a username and password. To begin, create a new file named first_experiment.py and add the contents of the example file. Ensure that you update the file with your own Linode's details.
Open Source Python-paramiko is used by IBM Netezza Host Management.
Paramiko is a pure-Python [1] (2.7, 3.4+) implementation of the SSHv2 protocol [2], providing both client and server functionality.
My paramiko version 1.15.2 not a paramiko issue. your remote proc is just terminated because you end your ssh channel. With & you make your remote command exit instantly. The remote sshd will therefore likely (depends on implementation, but openssh does) kill all processes that were started from your command invocation.
Paramiko is a Python module that implements the SSHv2 protocol. Paramiko is not part of Python’s standard library, although it’s widely used. This guide shows you how to use Paramiko in your Python scripts to authenticate to a server using a password and SSH keys. If you have not already done so, create a Linode account and Compute Instance.
The ssh_manager function will run up until the “yield” statement, yielding the netmiko SSH session handle. The SSH handle is stored in the “conn” variable (as conn) Now the body of the with block will run, which in our case we are sending the “show ip route” command to the network device and printing it.
Since exec_command is going to spawn a new thread for your command, you can just leave it open and proceed with other tasks. But make sure to empty buffers every now and then (for verbose remote commands) to prevent paramiko from stalling.
The following code runs a command on multiple hosts. When each command has some data waiting, it gets printed to the screen.
The overall form is adapted from Alex Martelli's code. This version has more logging, including showing a human-readable version of the each connection's host.
The original code was written for commands that run then exit. I changed it to print data incrementally, when it's available. Previously, the first thread that grabbed the lock would block on the read()
, and all the threads would starve. The new solution bypasses that.
EDIT, some notes:
To stop the program at a later time, we run into a rather sticky situation. Threads are uninterruptible -- we can't just set up a signal handler to sys.exit()
the program. The updated code is set up to exit safely after 3 seconds, by using a while loop to join()
each thread. For real code, if the parent exits then the threads should also correctly. Note carefully the two WARNINGs in the code, because the signal/exit/thread interaction is rather squirrely.
Code processes data as it comes it -- right now data is just printed to the console. It doesn't use non-blocking reads because 1) non-blocking code is much more complex, and 2) the original program didn't process child thread's data in the parent. For threads, it's easier to do everything the child thread, which writes to a file, database, or service. For anything more complicated, use multiprocessing
which is much easier, and has nice facilities for doing lots of jobs and restarting them if they die. That library also lets you distribute load across multiple CPUs, which threading does not allow.
Have fun!
EDIT #2
Note it's possible, and probably preferred, to run multiple processes without using threading
nor multiprocessing
. TLDR: use Popen
and a select()
loop to process batches of output. See sample code in Pastebin: run multiple commands without subprocess/multiprocessing
# adapted from https://stackoverflow.com/questions/3485428/creating-multiple-ssh-connections-at-a-time-using-paramiko
import signal, sys, threading
import paramiko
CMD = 'tail -f /var/log/syslog'
def signal_cleanup(_signum, _frame):
print '\nCLEANUP\n'
sys.exit(0)
def workon(host):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(host)
_stdin, stdout, _stderr = ssh.exec_command(CMD)
for line in stdout:
print threading.current_thread().name, line,
def main():
hosts = ['localhost', 'localhost']
# exit after a few seconds (see WARNINGs)
signal.signal(signal.SIGALRM, signal_cleanup)
signal.alarm(3)
threads = [
threading.Thread(
target=workon,
args=(host,),
name='host #{}'.format(num+1)
)
for num,host in enumerate(hosts)
]
print 'starting'
for t in threads:
# WARNING: daemon=True allows program to exit when main proc
# does; otherwise we'll wait until all threads complete.
t.daemon = True
t.start()
print 'joining'
for t in threads:
# WARNING: t.join() is uninterruptible; this while loop allows
# signals
# see: http://snakesthatbite.blogspot.com/2010/09/cpython-threading-interrupting.html
while t.is_alive():
t.join(timeout=0.1)
print 'done!'
if __name__=='__main__':
main()
starting
joining
host #2 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #2 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:28:25 palabras kernel: [158950.369443] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:12 palabras kernel: [158997.098833] ideapad_laptop: Unknown event: 1
host #1 Jun 27 16:29:36 palabras kernel: [159020.809748] ideapad_laptop: Unknown event: 1
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With