Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

asyncio server and client to handle input from console

I have an asyncio TCP server that take messages from client, do stuff() on server and sends texts back. Server works well in the sense that receives and sends data correctly. Problem is that I can't takes messages back from server in the client because I have the blocking routine on input from console (basically the data_received method is never executed). Only the exit command works fine (it closes the loop). How to solve this? This is the server and client code. It's basically the EchoClient asyncio version with some more plumbing code for an exercise.

# client.py
import abc
import asyncio
import sys

MENU = '''
a) do x
b) do y
c) exit
'''

loop_ = asyncio.get_event_loop()


class XCommand:
    def run(self):
        self.client.send_data_to_tcp('X:')  # to bytes


class YCommand(Command):
    def run(self):
         s = input('Input for Y ###  ')
         self.client.send_data_to_tcp('Y:' + s)


class ExitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('EXIT:')
        print('Goodbye!')
        loop_.close()
        exit()


class CommandFactory:
    _cmds = {'a': ACommand,
         'b': BCommand,
         'c': ExitCommand,
         }

    @classmethod
    def get_cmd(cls, cmd):
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


def show_menu(client):
    print(MENU)
    while True:
        command = input('Insert Command$: ')
        cmd_cls = CommandFactory.get_cmd(command)
        if not cmd_cls:
            print('Unknown: {}'.format(command))
            continue
        cmd_cls(client).run()


class Client(asyncio.Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n{!r}'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def main():

    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_in_executor(None, show_menu(client))  # I've tried this...not working

    loop_.run_forever()
    loop_.close()

if __name__ == '__main__':
    main()


# server.py
import abc
import asyncio
import sys
from asyncio_exercise.db import DB


class ACommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.a()
        if not res:
            return '>>>>>>>>>>> Empty <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class BCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.b(param1, param2)
        return 'B Ok!'


class ExitCommand:
    @classmethod
    def run(cls, db, param1=None, param2=None):
        loop.close()
        server.close()
        loop.run_until_complete(server.wait_closed())
        print('Buona giornata!!!')
        sys.exit(0)

class CommandFactory:
    _cmds = {'X': ACommand,
         'Y': BCommand,
         'EXIT': ExitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            param1, param2 = None, None
        else:
            param1, param2 = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, param1, param2


class Server(asyncio.Protocol):
    db_filename = '../data/db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(Server, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # Close the server
        server.close()
        loop.run_until_complete(server.wait_closed())
        loop.close()

UPDATE: The solution was to use aioconsole package and ainput function.Below code using aioconsole (working very good).

# server.py
import abc
import asyncio
from d_1_networking.esercizio_soluzione.SOversion.dummydb import DummyDB as DB


class Command(metaclass=abc.ABCMeta):
    @abc.abstractclassmethod
    def run(self, a, b, c):
        raise NotImplementedError()


class XCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        res = db.x()
        if not res:
            return '>>>>>>>>>>> Empty response! <<<<<<<<<<<<<'
        return '\n'.join('{}: {}'.format(col, val) for col, val in res.items())


class YCommand(Command):
    @classmethod
    def run(cls, db, param1=None, param2=None):
        db.y(param1)
        return 'Operation Y OK: {}'.format(param1)


class QuitCommand(Command):
    @classmethod
    def run(cls, rubrica_db, param1=None, param2=None):
        return 'Disconnected...'

class CommandFactory:
    _cmds = {'X': XCommand,
         'Y': YCommand,
         'DISCONNECT': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        tokens = cmd.split(':')
        cmd = tokens[0]
        if len(tokens) == 1:
            nome, numero = None, None
        else:
            nome, numero = (tokens[1], tokens[2]) if len(tokens) == 3 else (tokens[1], None)
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls, nome, numero

class Server(asyncio.Protocol):
    db_filename = '../data/exercise.db'
    db = DB(db_filename)

    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))
        cmd_cls, param1, param2 = CommandFactory.get_cmd(message)
        res = cmd_cls.run(self.db, param1, param2)
        print('Sending response: {!r}'.format(res))
        self.transport.write(bytes(res, encoding='UTF-8'))

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Each client connection will create a new protocol instance
    coro = loop.create_server(RubricaServer, '127.0.0.1', 10888)
    server = loop.run_until_complete(coro)

    # Serve requests until Ctrl+C is pressed
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass

    # Close the server
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()

#dummydb.py
class DummyDB:
    def __init__(self, fn):
        self.fn = fn

    def x(self):
        return {'field_a': '55 tt TTYY 3334 gghyyujh',
            'field_b': 'FF hhhnneeekk',
            'field_c': '00993342489048222 news'}

    def y(self, param):
        return param

# client.py
import abc
from asyncio import *
from aioconsole import ainput

MENU = '''
---------------------------
A) Command X
B) Command Y (require additional input)
C) Quit program
---------------------------
'''

loop_ = get_event_loop()


class Command(metaclass=abc.ABCMeta):
    asyn = False

    def __init__(self, tcp_client):
        self.client = tcp_client

    @abc.abstractmethod
    def run(self):
        raise NotImplementedError()


class ACommand(Command):
    def run(self):
        # send X command to server
        self.client.send_data_to_tcp('X:')


class BCommand(Command):
    asyn = True
    async def run(self):
        s = await ainput('Insert data for B operation (es. name:43d3HHte3) > ')
        # send Y command to server
        self.client.send_data_to_tcp('Y:' + s)


class QuitCommand(Command):
    def run(self):
        self.client.send_data_to_tcp('DISCONNECT:')
        print('Goodbye!!!')
        self.client.disconnect()
        exit()


class CommandFactory:
    _cmds = {'A': ACommand,
         'B': BCommand,
         'C': QuitCommand}

    @classmethod
    def get_cmd(cls, cmd):
        cmd = cmd.strip()
        cmd_cls = cls._cmds.get(cmd)
        return cmd_cls


class Client(Protocol):
    def __init__(self, loop):
        self.loop = loop
        self.transport = None

    def disconnect(self):
        self.loop.stop()

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print('Data received from server: \n===========\n{}\n===========\n'.format(data.decode()), flush=True)

    def send_data_to_tcp(self, data):
        self.transport.write(data.encode())

    def connection_lost(self, exc):
        print('The server closed the connection')
        print('Stop the event loop')
        self.loop.stop()


def menu():
    print(MENU)


async def main():
    menu()
    while True:
        cmd = await ainput('Insert Command >')
        cmd_cls = CommandFactory.get_cmd(cmd)
        if not cmd_cls:
            print('Unknown: {}'.format(cmd))
        elif cmd_cls.asyn:
            await cmd_cls(client).run()
        else:
            cmd_cls(client).run()


if __name__ == '__main__':
    client = Client(loop_)
    coro = loop_.create_connection(lambda: client, '127.0.0.1', 10888)
    loop_.run_until_complete(coro)
    loop_.run_until_complete(main())
like image 615
BangTheBank Avatar asked Oct 06 '16 17:10

BangTheBank


People also ask

How do you await user input in Python?

We can use input() function to achieve this. In this case, the program will wait indefinitely for the user input. Once the user provides the input data and presses the enter key, the program will start executing the next statements. sec = input('Let us wait for user input.

What is Asyncio Get_event_loop ()?

asyncio. get_event_loop() Get the current event loop. If there is no current event loop set in the current OS thread, the OS thread is main, and set_event_loop() has not yet been called, asyncio will create a new event loop and set it as the current one.

Is Asyncio a concurrency?

What is asyncio? Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop. The model isn't novel to Python and is implemented in other languages and frameworks too, the most prominent being JavaScript's NodeJS.

What is StreamReader in Python?

It defines read and other respective methods to read the data from the stream and "decode" them. The class exposes all other methods of the stream instance. Pseudocode of the codecs.StreamReader definition: Toggle line numbers 1 class StreamReader(Codec): 2 def __init__(self, stream): 3 ....


1 Answers

You could consider using aioconsole.ainput:

from aioconsole import ainput

async def some_coroutine():
    line = await ainput(">>> ")
    [...]

The project is available on PyPI.

like image 156
Vincent Avatar answered Oct 15 '22 22:10

Vincent