Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python aiohttp + asyncio: How to execute code after loop.run_forever()

Code:

#!/usr/bin/env python

import asyncio
import os
import socket
import time
import traceback
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)

def mk_socket(host="127.0.0.1", port=9090, reuseport=False):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    if reuseport:
        SO_REUSEPORT = 15
        sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
    sock.bind((host, port))
    return sock

async def index(request):
    icecast_index_path = os.path.abspath("../test/icecast/icecast_index.html")
    print(icecast_index_path)
    try:
        content = open(icecast_index_path, encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)
    except Exception as e:
        return web.Response(content_type="text/html", text="<!doctype html><body><h1>Error: "+str(e)+"</h1></body></html>")

async def start_server():
    try:
        host = "127.0.0.1"
        port=8080
        reuseport = True
        app = web.Application()
        app.add_routes([web.get('/', index)])
        runner = web.AppRunner(app)
        await runner.setup()
        sock = mk_socket(host, port, reuseport=reuseport)
        srv = web.SockSite(runner, sock)
        await srv.start()
        print('Server started at http://127.0.0.1:8080')
        return srv, app, runner
    except Exception:
        traceback.print_exc()
        raise

async def finalize(srv, app, runner):
    sock = srv.sockets[0]
    app.loop.remove_reader(sock.fileno())
    sock.close()

    #await handler.finish_connections(1.0)
    await runner.cleanup()
    srv.close()
    await srv.wait_closed()
    await app.finish()

def init():
    loop = asyncio.get_event_loop()
    srv, app, runner = loop.run_until_complete(start_server())
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        for i in range(0, int(CPU_COUNT/2)):
            executor.submit(init)

    #after the aiohttp start i want to execute more code
    #for example:
    print("Hello world.")
    #in actual programm the ProcessPoolExecutor is called
    #inside a pyqt5 app
    #so i don't want the pyqt5 app to freeze.

The problem is that with that code i can't execute code after the ProcessPoolExecutor calls.

How can i fix that?

I tried to remove this part:

try:
        loop.run_forever()
    except KeyboardInterrupt:
        loop.run_until_complete((finalize(srv, app, runner)))

in init() method but after that the aiohttp server is closed instantly.

Edit: If i use a thread instead of ProcessPoolExecutor then there is an aiohttp errors that says:

  1. RuntimeError: set_wakeup_fd only works in main thread
  2. RuntimeError: There is no current event loop in thread
  3. Aiohttp + asyncio related errors.

Maybe i may use a @ signature up to def declarations (i suppose).

like image 799
Chris P Avatar asked Jun 06 '26 18:06

Chris P


1 Answers

Using with with an Executor will cause your process to block until the jobs in the executor are completed; since they’re running infinite event loops, they will never complete and your Executor will never unblock.

Instead, just use the executor to kick off the jobs, and run your stuff afterwards. When you’re finally done, call .shutdown() to wait for processes to exit:

executor = ProcessPoolExecutor()
for i in range(0, int(CPU_COUNT/2)):
    executor.submit(init)
# other code…
executor.shutdown()
like image 101
nneonneo Avatar answered Jun 08 '26 06:06

nneonneo



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!