The code below works, but I'm a bit confused by the:
DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
and the
DeprecationWarning: There is no current event loop
loop.run_until_complete(asyncio.gather(*coroutine_list))
warnings.
I've found this and this but what I'm confused about is how to update my code. I understand that there isn't an even loop yet, but how do I set it? I assumed that I needed to change asyncio.get_event_loop() with asyncio.new_event_loop() but this doesn't seem to work.
This is my code
async def flush(self, queue_c) -> None:
try:
loop = asyncio.get_running_loop()
result_objects_action = loop.run_in_executor(None, self.create_objects, queue_c)
# do stuff
except:
# except handling
def _run_queue(self) -> None:
coroutine_list = []
c = 0
while c < len(self._queue):
coroutine_list.append(self.flush(c))
c+=1
# What should be changed here?
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*coroutine_list))
self._queue = []
Calling asyncio.new_event_loop() isn't enough, you need to set it after with asyncio.set_event_loop(loop). This deprecation warning for get_event_loop() was introduced by python 3.10 i believe. Here's how i setup the event loop so that it works with any python 3.x version:
import sys
import asyncio
if sys.version_info < (3, 10):
loop = asyncio.get_event_loop()
else:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With