I built a websocket server, a simplified version of it is shown below:
import websockets, subprocess, asyncio, json, re, os, sys
from multiprocessing import Process
def docker_command(command_words):
return subprocess.Popen(
["docker"] + command_words,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
async def check_submission(websocket:object, submission:dict):
exercise=submission["exercise"]
with docker_command(["exec", "-w", "badkan", "grade_exercise", exercise]) as proc:
for line in proc.stdout:
print("> " + line)
await websocket.send(line)
async def run(websocket, path):
submission_json = await websocket.recv() # returns a string
submission = json.loads(submission_json) # converts the string to a python dict
####
await check_submission(websocket, submission)
websocketserver = websockets.server.serve(run, '0.0.0.0', 8888, origins=None)
asyncio.get_event_loop().run_until_complete(websocketserver)
asyncio.get_event_loop().run_forever()
It works fine when there is only a single user at a time. But, when several users try to use the server, the server processes them serially so later users have to wait a long time.
I tried to convert it to a multiprocessing server by replacing the line marked with "####" ("await check_submission...") with:
p = Process(target=check_submission, args=(websocket, submission,))
p.start()
But, it did not work - I got a Runtime Warning: "coroutine: 'check_submission' was never awaited", and I did not see any output coming through the websocket.
I also tried to replace these lines with:
loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
await loop.run_in_executor(None, check_submission, websocket, submission)
but got a different error: "can't pickle asyncio.Future objects".
How can I build this multi-processing websocket server?
this is my example, asyncio.run()
worked for me, use multi process start an async function
class FlowConsumer(Base):
def __init__(self):
pass
async def run(self):
self.logger("start consumer process")
while True:
# get flow from queue
flow = {}
# call flow executor get result
executor = FlowExecutor(flow)
rtn = FlowResult()
try:
rtn = await executor.run()
except Exception as e:
self.logger("flow run except:{}".format(traceback.format_exc()))
rtn.status = FLOW_EXCEPT
rtn.msg = str(e)
self.logger("consumer flow finish,result:{}".format(rtn.dict()))
time.sleep(1)
def process(self):
asyncio.run(self.run())
processes = []
consumer_proc_count = 3
# start multi consumer processes
for _ in range(consumer_proc_count):
# old version
# p = Process(target=FlowConsumer().run)
p = Process(target=FlowConsumer().process)
p.start()
processes.append(p)
for p in processes:
p.join()
The problem is that subprocess.Popen
is not async, so check_submission
blocks the event loop while waiting for the next line of docker output.
You don't need to use multiprocessing at all; since you are blocking while waiting on a subprocess, you just need to switch from subprocess
to asyncio.subprocess
:
async def docker_command(command_words):
return await asyncio.subprocess.create_subprocess_exec(
*["docker"] + command_words,
stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
async def check_submission(websocket:object, submission:dict):
exercise = submission["exercise"]
proc = await docker_command(["exec", "-w", "badkan", "grade_exercise", exercise])
async for line in proc.stdout:
print(b"> " + line)
await websocket.send(line)
await proc.wait()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With