I'm having difficulty understanding the behaviour of my altered echo server, which attempts to take advantage of python 3's asyncio
module.
Essentially I have an infinite loop (lets say I want to stream some data from the server to the client indefinitely whilst the connection has been made) e.g. MyServer.py
:
#! /usr/bin/python3
import asyncio
import os
import time
class MyProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def connection_lost(self, exc):
asyncio.get_event_loop().stop()
def data_received(self, data):
i = 0
while True:
self.transport.write(b'>> %i' %i)
time.sleep(2)
i+=1
loop = asyncio.get_event_loop()
coro = loop.create_server(MyProtocol,
os.environ.get('MY_SERVICE_ADDRESS', 'localhost'),
os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)
try:
loop.run_forever()
except:
loop.run_until_complete(server.wait_closed())
finally:
loop.close()
Next when I connect with nc ::1 8100
and send some text (e.g. "testing") I get the following:
user@machine$ nc ::1 8100
*** Connection from('::1', 58503, 0, 0) ***
testing
>> 1
>> 2
>> 3
^C
Now when I attempt to connect using nc
again, I do not get any welcome message and after I attempt to send some new text to the server I get an endless stream of the following error:
user@machine$ nc ::1 8100
Is there anybody out there?
socket.send() raised exception
socket.send() raised exception
...
^C
Just to add salt to the wound the socket.send() raised exception
message continues to spam my terminal until I kill the python server process...
As I'm new to web technologies (been a desktop dinosaur for far too long!), I'm not sure why I am getting the above behaviour and I haven't got a clue on how to produce the intended behaviour, which loosely looks like this:
Any enlightenment would be extremely welcome!
It should be used as a main entry point for asyncio programs, and should ideally only be called once. New in version 3.7.
When we utilize asyncio we create objects called coroutines. A coroutine can be thought of as executing a lightweight thread. Much like we can have multiple threads running at the same time, each with their own concurrent I/O operation, we can have many coroutines running alongside one another.
Asyncio stands for asynchronous input output and refers to a programming paradigm which achieves high concurrency using a single thread or event loop.
This scheme is useful, but to get maximum benefit it needs to be implemented on multiple CPU's with multiple cores. Usage of AsyncIO within each multiprocessing context can maximize the use of each core thus providing cost optimisation.
There are multiple problems with the code.
First and foremost, data_received
never returns. At the transport/protocol level, asyncio programming is single-threaded and callback-based. Application code is scattered across callbacks like data_received
, and the event loop runs the show, monitoring file descriptors and invoking the callbacks as needed. Each callback is only allowed to perform a short calculation, invoke methods on transport, and arrange for further callbacks to be executed. What the callback cannot do is take a lot of time to complete or block waiting for something. A while
loop that never exits is especially bad because it doesn't allow the event loop to run at all.
This is why the code only spits out exceptions once the client disconnects: connection_lost
is never called. It's supposed to be called by the event loop, and the never-returning data_received
is not giving the event loop a chance to resume. With the event loop blocked, the program is unable to respond to other clients, and data_received
keeps trying to send data to the disconnected client, and logs its failure to do so.
The correct way to express the idea can look like this:
def data_received(self, data):
self.i = 0
loop.call_soon(self.write_to_client)
def write_to_client(self):
self.transport.write(b'>> %i' % self.i)
self.i += 1
loop.call_later(2, self.write_to_client)
Note how both data_received
and write_to_client
do very little work and quickly return. No calls to time.sleep()
, and definitely no infinite loops - the "loop" is hidden inside the kind-of-recursive call to write_to_client
.
This change reveals the second problem in the code. Its MyProtocol.connection_lost
stops the whole event loop and exits the program. This renders the program unable to respond to the second client. The fix could be to replace loop.stop()
with setting a flag in connection_lost
:
def data_received(self, data):
self._done = False
self.i = 0
loop.call_soon(self.write_to_client)
def write_to_client(self):
if self._done:
return
self.transport.write(b'>> %i' % self.i)
self.i += 1
loop.call_later(2, self.write_to_client)
def connection_lost(self, exc):
self._done = True
This allows multiple clients to connect.
Coroutines allow writing natural-looking code that contains loops and looks like it contains blocking calls, which under the hood are converted into suspension points that enable the event loop to resume. Using streams the code from the question would look like this:
async def talk_to_client(reader, writer):
peername = writer.get_extra_info('peername')
print('Connection from {}'.format(peername))
data = await reader.read(1024)
i = 0
while True:
writer.write(b'>> %i' % i)
await writer.drain()
await asyncio.sleep(2)
i += 1
loop = asyncio.get_event_loop()
coro = asyncio.start_server(talk_to_client,
os.environ.get('MY_SERVICE_ADDRESS', 'localhost'),
os.environ.get('MY_SERVICE_PORT', 8100))
server = loop.run_until_complete(coro)
loop.run_forever()
talk_to_client
looks very much like the original implementation of data_received
, but without the drawbacks. At each point where it uses await
the event loop is resumed if the data is not available. time.sleep(n)
is replaced with await asyncio.sleep(n)
which does the equivalent of loop.call_later(n, <resume current coroutine>)
. Awaiting writer.drain()
ensures that the coroutine pauses when the peer cannot process the output it gets, and that it raises an exception when the peer has disconnected.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With