I've been trying to write a basic terminal emulation script, because for some reason i've got no terminal access on my mac. But to write game engine scripts in blender the console, which usually opens in the terminal you started blender with, is crucial.
For just doing simple things like deleting, renaming etc. I used to execute commands using stream = os.popen(command)
and then print (stream.read())
. That works fine for most things but not for anything interactive.
Shortly i've discovered a new way:sp = subprocess.Popen(["/bin/bash", "-i"], stdout = subprocess.PIPE, stdin = subprocess.PIPE, stderr = subprocess.PIPE)
and then print(sp.communicate(command.encode()))
. That should spawn an interactive shell that i can use like a terminal, doesen't it?
But either way i can't keep the connection open, and using the last example I can call sp.communicate
once, giving me the following output(in this case for 'ls /') and some errors:(b'Applications\n[...]usr\nvar\n', b'bash: no job control in this shell\nbash-3.2$ ls /\nbash-3.2$ exit\n')
.
The second time it gives me a ValueError: I/O operation on closed file.
Sometimes (like for 'ls') I only get this error: b'ls\nbash-3.2$ exit\n'
.
What does that mean? How can i emulate a terminal with python that allows me to control an interactive shell or run blender and communicate with the console?
Assuming you want an interactive shell that keeps asking for input, you could try the following:
import subprocess
import re
while True:
# prevents lots of python error output
try:
s = raw_input('> ')
except:
break
# check if you should exit
if s.strip().lower() == 'exit':
break
# try to run command
try:
cmd = subprocess.Popen(re.split(r'\s+', s), stdout=subprocess.PIPE)
cmd_out = cmd.stdout.read()
# Process output
print cmd_out
except OSError:
print 'Invalid command'
Here is something that I worked on to do what you want in windows.. A much more difficult problem because windows doesn't follow any standard but their own. Slight modification of this code should give you exactly what you are looking for.
'''
Created on Mar 2, 2013
@author: rweber
'''
import subprocess
import Queue
from Queue import Empty
import threading
class Process_Communicator():
def join(self):
self.te.join()
self.to.join()
self.running = False
self.aggregator.join()
def enqueue_in(self):
while self.running and self.p.stdin is not None:
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s) + '\n\r')
pass
def enqueue_output(self):
if not self.p.stdout or self.p.stdout.closed:
return
out = self.p.stdout
for line in iter(out.readline, b''):
self.qo.put(line)
def enqueue_err(self):
if not self.p.stderr or self.p.stderr.closed:
return
err = self.p.stderr
for line in iter(err.readline, b''):
self.qe.put(line)
def aggregate(self):
while (self.running):
self.update()
self.update()
def update(self):
line = ""
try:
while self.qe.not_empty:
line = self.qe.get_nowait() # or q.get(timeout=.1)
self.unbblocked_err += line
except Empty:
pass
line = ""
try:
while self.qo.not_empty:
line = self.qo.get_nowait() # or q.get(timeout=.1)
self.unbblocked_out += line
except Empty:
pass
while not self.stdin_queue.empty():
s = self.stdin_queue.get()
self.p.stdin.write(str(s) + '\n\r')
def get_stdout(self, clear=True):
ret = self.unbblocked_out
if clear:
self.unbblocked_out = ""
return ret
def has_stdout(self):
ret = self.get_stdout(False)
if ret == '':
return None
else:
return ret
def get_stderr(self, clear=True):
ret = self.unbblocked_err
if clear:
self.unbblocked_err = ""
return ret
def has_stderr(self):
ret = self.get_stderr(False)
if ret == '':
return None
else:
return ret
def __init__(self, subp):
'''This is a simple class that collects and aggregates the
output from a subprocess so that you can more reliably use
the class without having to block for subprocess.communicate.'''
self.p = subp
self.unbblocked_out = ""
self.unbblocked_err = ""
self.running = True
self.qo = Queue.Queue()
self.to = threading.Thread(name="out_read",
target=self.enqueue_output,
args=())
self.to.daemon = True # thread dies with the program
self.to.start()
self.qe = Queue.Queue()
self.te = threading.Thread(name="err_read",
target=self.enqueue_err,
args=())
self.te.daemon = True # thread dies with the program
self.te.start()
self.stdin_queue = Queue.Queue()
self.aggregator = threading.Thread(name="aggregate",
target=self.aggregate,
args=())
self.aggregator.daemon = True # thread dies with the program
self.aggregator.start()
pass
def write_stdin(p,c):
while p.poll() == None:
i = raw_input("send to process:")
if i is not None:
c.stdin_queue.put(i)
p = subprocess.Popen("cmd.exe", shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stdin=subprocess.PIPE)
c = Process_Communicator(p)
stdin = threading.Thread(name="write_stdin",
target=write_stdin,
args=(p,c))
stdin.daemon = True # thread dies with the program
stdin.start()
while p.poll() == None:
if c.has_stdout():
print c.get_stdout()
if c.has_stderr():
print c.get_stderr()
c.join()
print "Exit"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With