I'm working with an asyncio forever()
eventloop. Now I want to restart the loop (stop the loop and recreate a new loop) after a process or a signal or a change in a file, but I have some problems to do that:
Here are three simplified code snippets in which demonstrate some coroutine workers and a coroutine loop restarter:
#1st try:
import asyncio
async def coro_worker(proc):
print(f'Worker: {proc} started.')
while True:
print(f'Worker: {proc} process.')
await asyncio.sleep(proc)
async def reset_loop(loop):
# Some process
for i in range(5): # Like a process.
print(f'{i} counting for reset the eventloop.')
await asyncio.sleep(1)
main(loop) # Expected close the current loop and start a new loop!
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
# task.clear()
# task.close()
# task.stop()
print("Done cancelling tasks")
asyncio.get_event_loop().stop()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[1]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Done cancelling tasks
Closing Loop
Closing Loop
Task exception was never retrieved
future: <Task cancelling coro=<reset_loop() done, defined at reset_asycio.py:11> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 40, in main
loop.run_forever()
File "/usr/lib/python3.6/asyncio/base_events.py", line 425, in run_forever
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "reset_asycio.py", line 17, in reset_loop
main(loop) # Expected close the current loop and start a new loop!
File "reset_asycio.py", line 48, in main
loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<reset_loop() running at reset_asycio.py:11>>
reset_asycio.py:51: RuntimeWarning: coroutine 'reset_loop' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
reset_asycio.py:51: RuntimeWarning: coroutine 'coro_worker' was never awaited
main()
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:4>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() running at reset_asycio.py:8> wait_for=<Future cancelled>>
#2nd try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
previous_loop.stop()
previous_loop.close()
offset = 1 # An offset to change the process name.
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[2]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Closing Loop
Task exception was never retrieved
future: <Task finished coro=<reset_loop() done, defined at reset_asycio.py:9> exception=RuntimeError('Cannot close a running event loop',)>
Traceback (most recent call last):
File "reset_asycio.py", line 15, in reset_loop
main(loop) # Expected close the current loop and start new loop!
File "reset_asycio.py", line 21, in main
previous_loop.close()
File "/usr/lib/python3.6/asyncio/unix_events.py", line 63, in close
super().close()
File "/usr/lib/python3.6/asyncio/selector_events.py", line 96, in close
raise RuntimeError("Cannot close a running event loop")
RuntimeError: Cannot close a running event loop
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f138>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<coro_worker() done, defined at reset_asycio.py:3> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efed846f048>()]>>
#3rd try:
.
.
.
def main(previous_loop=None):
offset = 0
if previous_loop is not None: # Trying for close the last loop if exist.
offset = 1 # An offset to change the process name.
for task in asyncio.Task.all_tasks():
print('Cancel the tasks') # Why it increase up?
task.cancel()
process = [1 + offset, 2 + offset]
loop = asyncio.get_event_loop()
futures = [loop.create_task(coro_worker(proc)) for proc in process]
futures.append(loop.create_task(reset_loop(loop)))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
except asyncio.CancelledError:
print('Tasks has been canceled')
main() # Recursively
finally:
print("Closing Loop")
loop.close()
main()
Out[3]:
Worker: 1 started.
Worker: 1 process.
Worker: 2 started.
Worker: 2 process.
0 counting for reset the eventloop.
Worker: 1 process.
1 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
2 counting for reset the eventloop.
Worker: 1 process.
3 counting for reset the eventloop.
Worker: 2 process.
Worker: 1 process.
4 counting for reset the eventloop.
Worker: 1 process.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
0 counting for reset the eventloop.
1 counting for reset the eventloop.
Worker: 2 process.
2 counting for reset the eventloop.
Worker: 3 process.
3 counting for reset the eventloop.
Worker: 2 process.
4 counting for reset the eventloop.
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Cancel the tasks
Closing Loop
Worker: 2 started.
Worker: 2 process.
Worker: 3 started.
Worker: 3 process.
.
.
.
#Problem:
In the #3rd try, apparently I've done it, but print('Cancel the tasks')
increases up after each restarting, what's the reason?!
Is there a better approach to overcome this problem?
Forgive me for the long question I tried to simplify it!
[NOTE]:
asyncio.timeout()
The recursive call to main()
and the new event loop adds unnecessary complication. Here is a simpler prototype to play with - it monitors an external source (the file system) and, when a file is created, it just stops the loop. main()
contains a loop that takes care of both (re-)creating and cancelling the tasks:
import os, asyncio, random
async def monitor():
loop = asyncio.get_event_loop()
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
loop.stop()
await asyncio.sleep(1)
async def work(workid):
while True:
t = random.random()
print(workid, 'sleeping for', t)
await asyncio.sleep(t)
def main():
loop = asyncio.get_event_loop()
loop.create_task(monitor())
offset = 0
while True:
workers = []
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
loop.run_forever()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
main()
Another option would be to never even stop the event loop, but to simply trigger a reset event:
async def monitor(evt):
while True:
if os.path.exists('reset'):
print('reset!')
os.unlink('reset')
evt.set()
await asyncio.sleep(1)
In this design main()
can be a coroutine:
async def main():
loop = asyncio.get_event_loop()
reset_evt = asyncio.Event()
loop.create_task(monitor(reset_evt))
offset = 0
while True:
workers = []
workers.append(loop.create_task(work(offset + 1)))
workers.append(loop.create_task(work(offset + 2)))
workers.append(loop.create_task(work(offset + 3)))
await reset_evt.wait()
reset_evt.clear()
for t in workers:
t.cancel()
offset += 3
if __name__ == '__main__':
asyncio.run(main())
# or asyncio.get_event_loop().run_until_complete(main())
Note that in both variants canceling the tasks is implemented by await
raising a CancelledError
exception. The task must not catch all exceptions using try: ... except: ...
and, if it does so, needs to re-raise the exception.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With