Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Non-blocking input in python 3 [duplicate]

Tags:

python

input

I'm working on a bot for a competition that receives its input through sys.stdin and uses Python's print() for output. I have the following:

import sys

def main():
    while True:
        line = sys.stdin.readline()
        parts = line.split()
        if len(parts) > 0:
            # do stuff

The problem is that the input comes in through a stream and using the above, blocks me from printing anything back until the stream is closed. What can I do to make this work?

like image 818
Arthelais Avatar asked Feb 15 '14 00:02

Arthelais


4 Answers

By turning blocking off you can only read a character at a time. So, there is no way to get readline() to work in a non-blocking context. I assume you just want to read key presses to control the robot.

I have had no luck using select.select() on Linux and created a way with tweaking termios settings. So, this is Linux specific but works for me:

import atexit, termios
import sys, os
import time


old_settings=None

def init_any_key():
   global old_settings
   old_settings = termios.tcgetattr(sys.stdin)
   new_settings = termios.tcgetattr(sys.stdin)
   new_settings[3] = new_settings[3] & ~(termios.ECHO | termios.ICANON) # lflags
   new_settings[6][termios.VMIN] = 0  # cc
   new_settings[6][termios.VTIME] = 0 # cc
   termios.tcsetattr(sys.stdin, termios.TCSADRAIN, new_settings)


@atexit.register
def term_any_key():
   global old_settings
   if old_settings:
      termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)


def any_key():
   ch_set = []
   ch = os.read(sys.stdin.fileno(), 1)
   while ch is not None and len(ch) > 0:
      ch_set.append( ord(ch[0]) )
      ch = os.read(sys.stdin.fileno(), 1)
   return ch_set


init_any_key()
while True:
   key = any_key()
   if key is not None:
      print(key)
   else:
      time.sleep(0.1)

A better Windows or cross-platform answer is here: Non-blocking console input?

like image 178
swdev Avatar answered Nov 13 '22 02:11

swdev


You can use selectors for handle I/O multiplexing:

https://docs.python.org/3/library/selectors.html

Try this out:

#! /usr/bin/python3

import sys
import fcntl
import os
import selectors

# set sys.stdin non-blocking
orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

# function to be called when enter is pressed
def got_keyboard_data(stdin):
    print('Keyboard input: {}'.format(stdin.read()))

# register event
m_selector = selectors.DefaultSelector()
m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

while True:
    sys.stdout.write('Type something and hit enter: ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj)

The above code will hold on the line

for k, mask in m_selector.select():

until a registered event occurs, returning a selector_key instance (k) and a mask of monitored events.

In the above example we registered only one event (Enter key press):

m_selector.register(sys.stdin, selectors.EVENT_READ, got_keyboard_data)

The selector key instance is defined as follows:

abstractmethod register(fileobj, events, data=None)

Therefore, the register method sets k.data as our callback function got_keyboard_data, and calls it when the Enter key is pressed:

callback = k.data
callback(k.fileobj)

A more complete example (and hopefully more useful) would be to multiplex stdin data from user with incoming connections from network:

import selectors
import socket
import sys
import os
import fcntl

m_selector = selectors.DefaultSelector()

# set sys.stdin non-blocking
def set_input_nonblocking():
    orig_fl = fcntl.fcntl(sys.stdin, fcntl.F_GETFL)
    fcntl.fcntl(sys.stdin, fcntl.F_SETFL, orig_fl | os.O_NONBLOCK)

def create_socket(port, max_conn):
    server_addr = ('localhost', port)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.setblocking(False)
    server.bind(server_addr)
    server.listen(max_conn)
    return server

def read(conn, mask):
    global GO_ON
    client_address = conn.getpeername()
    data = conn.recv(1024)
    print('Got {} from {}'.format(data, client_address))
    if not data:
         GO_ON = False

def accept(sock, mask):
    new_conn, addr = sock.accept()
    new_conn.setblocking(False)
    print('Accepting connection from {}'.format(addr))
    m_selector.register(new_conn, selectors.EVENT_READ, read)

def quit():
    global GO_ON
    print('Exiting...')
    GO_ON = False


def from_keyboard(arg1, arg2):
    line = arg1.read()
    if line == 'quit\n':
        quit()
    else:
        print('User input: {}'.format(line))

GO_ON = True
set_input_nonblocking()

# listen to port 10000, at most 10 connections
server = create_socket(10000, 10)

m_selector.register(server, selectors.EVENT_READ, accept)
m_selector.register(sys.stdin, selectors.EVENT_READ, from_keyboard)

while GO_ON:
    sys.stdout.write('>>> ')
    sys.stdout.flush()
    for k, mask in m_selector.select():
        callback = k.data
        callback(k.fileobj, mask)


# unregister events
m_selector.unregister(sys.stdin)

# close connection
server.shutdown()
server.close()

#  close select
m_selector.close()

You can test using two terminals. first terminal:

$ python3 test.py 
>>> bla

open another terminal and run:

 $ nc localhost 10000
 hey!

back to the first

>>> qwerqwer     

Result (seen on the main terminal):

$ python3 test.py 
>>> bla
User input: bla

>>> Accepting connection from ('127.0.0.1', 39598)
>>> Got b'hey!\n' from ('127.0.0.1', 39598)
>>> qwerqwer     
User input: qwerqwer

>>> 
like image 35
Zeh Avatar answered Nov 13 '22 03:11

Zeh


#-----------------------------------------------------------------------
# Get a character from the keyboard.  If Block is True wait for input,
# else return any available character or throw an exception if none is
# available.  Ctrl+C isn't handled and continues to generate the usual
# SIGINT signal, but special keys like the arrows return the expected 
# escape sequences.
#
# This requires:
#
#    import sys, select
#
# This was tested using python 2.7 on Mac OS X.  It will work on any
# Linux system, but will likely fail on Windows due to select/stdin
# limitations.
#-----------------------------------------------------------------------

def get_char(block = True):
    if block or select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
        return sys.stdin.read(1)
    raise error('NoChar')
like image 33
Michael Nagy Avatar answered Nov 13 '22 04:11

Michael Nagy


This is a posix solution, similar to the answer by swdev.

As they stated, you have to play with termios.VMIN and termios.VTIME to catch more than one char without requiring user to press Enter. Trying to only use raw mode will be a problem as special keys like arrows can mess next keypress.

Here we use tty.setcbreak() or tty.setraw() as a shortcut, but they have short internals.

import termios
import tty
import sys
import select

def get_enter_key():
    fd = sys.stdin.fileno()
    orig_fl = termios.tcgetattr(fd)
    try:
        tty.setcbreak(fd)  # use tty.setraw() instead to catch ^C also
        mode = termios.tcgetattr(fd)
        CC = 6
        mode[CC][termios.VMIN] = 0
        mode[CC][termios.VTIME] = 0
        termios.tcsetattr(fd, termios.TCSAFLUSH, mode)
        keypress, _, _ = select.select([fd], [], [])
        if keypress:
            return sys.stdin.read(4095)
    finally:
        termios.tcsetattr(fd, termios.TCSANOW, orig_fl)

try:
    while True:
        print(get_enter_key())
except KeyboardInterrupt:
    print('exiting')
    sys.exit()

note that there are two potential timeouts you could add here:

  • one is adding last parameter to select.select()
  • another is playing with VMIN and VTIME
like image 3
Arnaudv6 Avatar answered Nov 13 '22 03:11

Arnaudv6