Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python Asyncio errors: "OSError: [WinError 6] The handle is invalid" and "RuntimeError: Event loop is closed" [duplicate]

I am having some difficulties with properly with my code, as I get the following error after my code finishes executing while debugging on VSCode:

Exception ignored in: <function _ProactorBasePipeTransport.__del__ at 0x00000188AB3259D0>
Traceback (most recent call last):
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 116, in __del__
    self.close()
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\proactor_events.py", line 108, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 719, in call_soon
    self._check_closed()
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I also get the following error if I run the code using the command line:

Cancelling an overlapped future failed
future: <_OverlappedFuture pending overlapped=<pending, 0x25822633550> cb=[_ProactorReadPipeTransport._loop_reading()]>
Traceback (most recent call last):
  File "c:\users\gam3p\appdata\local\programs\python\python38\lib\asyncio\windows_events.py", line 66, in _cancel_overlapped
    self._ov.cancel()
OSError: [WinError 6] The handle is invalid

The code below is an asynchronous API wrapper for Reddit. Since Reddit asks bots to be limited to a rate of 60 requests per minute, I have decided to implement a throttled loop that processes the requests in a queue, in a separate thread.

To run it, you would need to create a Reddit app and use your login credentials as well as the bot ID and secret.

If it helps, I am using Python 3.8.3 64-bit on Windows 10.

import requests
import aiohttp
import asyncio
import threading
from types import SimpleNamespace
from time import time

oauth_url = 'https://oauth.reddit.com/'
base_url = 'https://www.reddit.com/'

agent = 'windows:reddit-async:v0.1 (by /u/UrHyper)'


class Reddit:
    def __init__(self, username: str, password: str, app_id: str, app_secret: str):
        data = {'grant_type': 'password',
                'username': username, 'password': password}
        auth = requests.auth.HTTPBasicAuth(app_id, app_secret)
        response = requests.post(base_url + 'api/v1/access_token',
                                 data=data,
                                 headers={'user-agent': agent},
                                 auth=auth)
        self.auth = response.json()

        if 'error' in self.auth:
            msg = f'Failed to authenticate: {self.auth["error"]}'
            if 'message' in self.auth:
                msg += ' - ' + self.auth['message']
            raise ValueError(msg)

        token = 'bearer ' + self.auth['access_token']
        self.headers = {'Authorization': token, 'User-Agent': agent}

        self._rate = 1
        self._last_loop_time = 0.0
        self._loop = asyncio.new_event_loop()
        self._queue = asyncio.Queue(0)
        self._end_loop = False
        self._loop_thread = threading.Thread(target=self._start_loop_thread)
        self._loop_thread.start()

    def stop(self):
        self._end_loop = True

    def __del__(self):
        self.stop()

    def _start_loop_thread(self):
        asyncio.set_event_loop(self._loop)
        self._loop.run_until_complete(self._process_queue())

    async def _process_queue(self):
        while True:
            if self._end_loop and self._queue.empty():
                await self._queue.join()
                break
            start_time = time()
            if self._last_loop_time < self._rate:
                await asyncio.sleep(self._rate - self._last_loop_time)
            try:
                queue_item = self._queue.get_nowait()
                url = queue_item['url']
                callback = queue_item['callback']
                data = await self._get_data(url)
                self._queue.task_done()
                callback(data)
            except asyncio.QueueEmpty:
                pass
            finally:
                self._last_loop_time = time() - start_time

    async def _get_data(self, url):
        async with aiohttp.ClientSession() as session:
            async with session.get(url, headers=self.headers) as response:
                assert response.status == 200
                data = await response.json()
                data = SimpleNamespace(**data)
                return data

    async def get_bot(self, callback: callable):
        url = oauth_url + 'api/v1/me'
        await self._queue.put({'url': url, 'callback': callback})

    async def get_user(self, user: str, callback: callable):
        url = oauth_url + 'user/' + user + '/about'
        await self._queue.put({'url': url, 'callback': callback})


def callback(data): print(data['name'])


async def main():
    reddit = Reddit('', '', '', '')

    await reddit.get_bot(lambda bot: print(bot.name))
    reddit.stop()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
like image 637
UrHyper Avatar asked Jun 16 '20 16:06

UrHyper


1 Answers

I ran into a similar problem with asyncio.

Since Python 3.8 they change the default event loop on Windows to ProactorEventLoop instead of SelectorEventLoop and their are some issues with it.

so adding

asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())

above

loop = asyncio.get_event_loop()

Will get the old eventloop back without issues.

like image 137
Gerrit Geeraerts Avatar answered Nov 03 '22 05:11

Gerrit Geeraerts