I'm working on a bot for a competition that receives its input through sys.stdin
and uses Python's print()
for output. I have the following:
import sys def main(): while True: line = sys.stdin.readline() parts = line.split() if len(parts) > 0: # do stuff
The problem is that the input comes in through a stream and using the above, blocks me from printing anything back until the stream is closed. What can I do to make this work?
stdin. readline() is the fastest one when reading strings and input() when reading integers.
stdin. read() method accepts a line as the input from the user until a special character like Enter Key and followed by Ctrl + D and then stores the input as the string.
Ctrl d closes the standard input (stdin) by sending EOF.
By turning blocking off you can only read a character at a time. So, there is no way to get readline()
to work in a non-blocking context. I assume you just want to read key presses to control the robot.
I have had no luck using select.select()
on Linux and created a way with tweaking termios
settings. So, this is Linux specific but works for me:
import atexit, termios import sys, os import time old_settings=None def init_any_key(): global old_settings old_settings = termios.tcgetattr(sys.stdin) new_settings = termios.tcgetattr(sys.stdin) new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags new_settings[6][termios.VMIN] = 0 # cc new_settings[6][termios.VTIME] = 0 # cc termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings) @atexit.register def term_any_key(): global old_settings if old_settings: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) def any_key(): ch_set = [] ch = os.read(sys.stdin.fileno(), 1) while ch is not None and len(ch) > 0: ch_set.append( ord(ch[0]) ) ch = os.read(sys.stdin.fileno(), 1) return ch_set init_any_key() while True: key = any_key() if key is not None: print(key) else: time.sleep(0.1)
A better Windows or cross-platform answer is here: Non-blocking console input?
You can use selectors for handle I/O multiplexing:
https://docs.python.org/3/library/selectors.html
Try this out:
#! /usr/bin/python3 import sys import fcntl import os import selectors # set sys.stdin non-blocking orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) # function to be called when enter is pressed def got_keyboard_data(stdin): print('Keyboard input: {}'.format(stdin.read())) # register event m_selector = selectors.DefaultSelector() m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data) while True: sys.stdout.write('Type something and hit enter: ') sys.stdout.flush() for k, mask in m_selector.select(): callback = k.data callback(k.fileobj)
The above code will hold on the line
for k, mask in m_selector.select():
until a registered event occurs, returning a selector_key instance (k) and a mask of monitored events.
In the above example we registered only one event (Enter key press):
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)
The selector key instance is defined as follows:
abstractmethod register(fileobj, events, data=None)
Therefore, the register method sets k.data
as our callback function got_keyboard_data
, and calls it when the Enter key is pressed:
callback = k.data callback(k.fileobj)
A more complete example (and hopefully more useful) would be to multiplex stdin data from user with incoming connections from network:
import selectors import socket import sys import os import fcntl m_selector = selectors.DefaultSelector() # set sys.stdin non-blocking def set_input_nonblocking(): orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL) fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK) def create_socket(port, max_conn): server_addr = ('localhost', port) server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.setblocking(False) server.bind(server_addr) server.listen(max_conn) return server def read(conn, mask): global GO_ON client_address = conn.getpeername() data = conn.recv(1024) print('Got {} from {}'.format(data, client_address)) if not data: GO_ON = False def accept(sock, mask): new_conn, addr = sock.accept() new_conn.setblocking(False) print('Accepting connection from {}'.format(addr)) m_selector.register(new_conn, selectors.EVENT_READ, read) def quit(): global GO_ON print('Exiting...') GO_ON = False def from_keyboard(arg1, arg2): line = arg1.read() if line == 'quit\n': quit() else: print('User input: {}'.format(line)) GO_ON = True set_input_nonblocking() # listen to port 10000, at most 10 connections server = create_socket(10000, 10) m_selector.register(server, selectors.EVENT_READ, accept) m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard) while GO_ON: sys.stdout.write('>>> ') sys.stdout.flush() for k, mask in m_selector.select(): callback = k.data callback(k.fileobj, mask) # unregister events m_selector.unregister(sys.stdin) # close connection server.shutdown() server.close() # close select m_selector.close()
You can test using two terminals. first terminal:
$ python3 test.py >>> bla
open another terminal and run:
$ nc localhost 10000 hey!
back to the first
>>> qwerqwer
Result (seen on the main terminal):
$ python3 test.py >>> bla User input: bla >>> Accepting connection from ('127.0.0.1', 39598) >>> Got b'hey!\n' from ('127.0.0.1', 39598) >>> qwerqwer User input: qwerqwer >>>
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With