Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multiprocessing with async functions

I built a websocket server, a simplified version of it is shown below:

import websockets, subprocess, asyncio, json, re, os, sys
from multiprocessing import Process

def docker_command(command_words):
    return subprocess.Popen(
        ["docker"] + command_words,
        stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)

async def check_submission(websocket:object, submission:dict):
    exercise=submission["exercise"]
    with docker_command(["exec", "-w", "badkan", "grade_exercise", exercise]) as proc:
        for line in proc.stdout:
            print("> " + line)
            await websocket.send(line)

async def run(websocket, path):
    submission_json = await websocket.recv()   # returns a string
    submission = json.loads(submission_json)   # converts the string to a python dict

    ####
    await check_submission(websocket, submission)


websocketserver = websockets.server.serve(run, '0.0.0.0', 8888, origins=None)
asyncio.get_event_loop().run_until_complete(websocketserver)
asyncio.get_event_loop().run_forever()

It works fine when there is only a single user at a time. But, when several users try to use the server, the server processes them serially so later users have to wait a long time.

I tried to convert it to a multiprocessing server by replacing the line marked with "####" ("await check_submission...") with:

p = Process(target=check_submission, args=(websocket, submission,))
p.start()

But, it did not work - I got a Runtime Warning: "coroutine: 'check_submission' was never awaited", and I did not see any output coming through the websocket.

I also tried to replace these lines with:

loop = asyncio.get_event_loop()
loop.set_default_executor(ProcessPoolExecutor())
await loop.run_in_executor(None, check_submission, websocket, submission)

but got a different error: "can't pickle asyncio.Future objects".

How can I build this multi-processing websocket server?

like image 984
Erel Segal-Halevi Avatar asked Sep 02 '25 16:09

Erel Segal-Halevi


2 Answers

this is my example, asyncio.run() worked for me, use multi process start an async function

class FlowConsumer(Base):
    def __init__(self):
        pass

    async def run(self):
        self.logger("start consumer process")
        while True:
            # get flow from queue
            flow = {}
            # call flow executor get result
            executor = FlowExecutor(flow)
            rtn = FlowResult()
            try:
                rtn = await executor.run()
            except Exception as e:
                self.logger("flow run except:{}".format(traceback.format_exc()))
                rtn.status = FLOW_EXCEPT
                rtn.msg = str(e)
            self.logger("consumer flow finish,result:{}".format(rtn.dict()))
            time.sleep(1)

    def process(self):
        asyncio.run(self.run())


processes = []
consumer_proc_count = 3

# start multi consumer processes 
for _ in range(consumer_proc_count):
    # old version
    # p = Process(target=FlowConsumer().run)
    p = Process(target=FlowConsumer().process)
    p.start()
    processes.append(p)

for p in processes:
    p.join()
like image 112
SkyWay Avatar answered Sep 04 '25 16:09

SkyWay


The problem is that subprocess.Popen is not async, so check_submission blocks the event loop while waiting for the next line of docker output.

You don't need to use multiprocessing at all; since you are blocking while waiting on a subprocess, you just need to switch from subprocess to asyncio.subprocess:

async def docker_command(command_words):
    return await asyncio.subprocess.create_subprocess_exec(
        *["docker"] + command_words,
        stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)

async def check_submission(websocket:object, submission:dict):
    exercise = submission["exercise"]
    proc = await docker_command(["exec", "-w", "badkan", "grade_exercise", exercise])
    async for line in proc.stdout:
        print(b"> " + line)
        await websocket.send(line)
    await proc.wait()
like image 45
user4815162342 Avatar answered Sep 04 '25 17:09

user4815162342