Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RuntimeError: There is no current event loop in thread in async + apscheduler

I have a async function and need to run in with apscheduller every N minutes. There is a python code below

URL_LIST = ['<url1>',
            '<url2>',
            '<url2>',
            ]

def demo_async(urls):
    """Fetch list of web pages asynchronously."""
    loop = asyncio.get_event_loop() # event loop
    future = asyncio.ensure_future(fetch_all(urls)) # tasks to do
    loop.run_until_complete(future) # loop until done

async def fetch_all(urls):
    tasks = [] # dictionary of start times for each url
    async with ClientSession() as session:
        for url in urls:
            task = asyncio.ensure_future(fetch(url, session))
            tasks.append(task) # create list of tasks
        _ = await asyncio.gather(*tasks) # gather task responses

async def fetch(url, session):
    """Fetch a url, using specified ClientSession."""
    async with session.get(url) as response:
        resp = await response.read()
        print(resp)

if __name__ == '__main__':
    scheduler = AsyncIOScheduler()
    scheduler.add_job(demo_async, args=[URL_LIST], trigger='interval', seconds=15)
    scheduler.start()
    print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

    # Execution will block here until Ctrl+C (Ctrl+Break on Windows) is pressed.
    try:
        asyncio.get_event_loop().run_forever()
    except (KeyboardInterrupt, SystemExit):
        pass

But when i tried to run it i have the next error info

Job "demo_async (trigger: interval[0:00:15], next run at: 2017-10-12 18:21:12 +04)" raised an exception.....
..........\lib\asyncio\events.py", line 584, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread '<concurrent.futures.thread.ThreadPoolExecutor object at 0x0356B150>_0'.

Could you please help me with this? Python 3.6, APScheduler 3.3.1,

like image 376
Valera Shutylev Avatar asked Oct 13 '17 10:10

Valera Shutylev


5 Answers

In your def demo_async(urls), try to replace:

loop = asyncio.get_event_loop()

with:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
like image 197
prezha Avatar answered Nov 16 '22 23:11

prezha


The important thing that hasn't been mentioned is why the error occurs. For me personally, knowing why the error occurs is as important as solving the actual problem.

Let's take a look at the implementation of the get_event_loop of BaseDefaultEventLoopPolicy:

class BaseDefaultEventLoopPolicy(AbstractEventLoopPolicy):
    ...

    def get_event_loop(self):
        """Get the event loop.

        This may be None or an instance of EventLoop.
        """
        if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
            self.set_event_loop(self.new_event_loop())
        if self._local._loop is None:
            raise RuntimeError('There is no current event loop in thread %r.'
                               % threading.current_thread().name)
        return self._local._loop

You can see that the self.set_event_loop(self.new_event_loop()) is only executed if all of the below conditions are met:

  • self._local._loop is None - _local._loop is not set
  • not self._local._set_called - set_event_loop hasn't been called yet
  • isinstance(threading.current_thread(), threading._MainThread) - current thread is the main one (this is not True in your case)

Therefore the exception is raised, because no loop is set in the current thread:

if self._local._loop is None:
    raise RuntimeError('There is no current event loop in thread %r.'
                       % threading.current_thread().name)
like image 37
radzak Avatar answered Nov 16 '22 23:11

radzak


Just pass fetch_all to scheduler.add_job() directly. The asyncio scheduler supports coroutine functions as job targets.

If the target callable is not a coroutine function, it will be run in a worker thread (due to historical reasons), hence the exception.

like image 15
Alex Grönholm Avatar answered Nov 17 '22 01:11

Alex Grönholm


Use asyncio.run() instead of directly using the event loop. It creates a new loop and closes it when finished.

This is how the 'run' looks like:

if events._get_running_loop() is not None:
    raise RuntimeError(
        "asyncio.run() cannot be called from a running event loop")

if not coroutines.iscoroutine(main):
    raise ValueError("a coroutine was expected, got {!r}".format(main))

loop = events.new_event_loop()
try:
    events.set_event_loop(loop)
    loop.set_debug(debug)
    return loop.run_until_complete(main)
finally:
    try:
        _cancel_all_tasks(loop)
        loop.run_until_complete(loop.shutdown_asyncgens())
    finally:
        events.set_event_loop(None)
        loop.close()
like image 6
Asaf Pinhassi Avatar answered Nov 17 '22 00:11

Asaf Pinhassi


I had a similar issue where I wanted my asyncio module to be callable from a non-asyncio script (which was running under gevent... don't ask...). The code below resolved my issue because it tries to get the current event loop, but will create one if there isn't one in the current thread. Tested in python 3.9.11.

try:
    loop = asyncio.get_event_loop()
except RuntimeError as e:
    if str(e).startswith('There is no current event loop in thread'):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    else:
        raise
like image 5
ZachL Avatar answered Nov 17 '22 01:11

ZachL