Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

grequests pool with multiple request.session?

I want to make a lot of url requets to a REST webserivce. Typically between 75-90k. However, I need to throttle the number of concurrent connections to the webservice.

I started playing around with grequests in the following manner, but quickly started chewing up opened sockets.

concurrent_limit = 30
urllist = buildUrls()
hdrs = {'Host' : 'hostserver'}
g_requests = (grequests.get(url, headers=hdrs) for url in urls)
g_responses = grequests.map(g_requests, size=concurrent_limit)

As this runs for a minute or so, I get hit with 'maximum number of sockets reached' errors. As far as I can tell, each one of the requests.get calls in grequests uses it's own session which means a new socket is opened for each request.

I found a note on github referring how to make grequests use a single session. But this seems to effectively bottleneck all requests into a single shared pool. That seems to defeat the purpose of asynchronous http requests.

s = requests.session()
rs = [grequests.get(url, session=s) for url in urls]
grequests.map(rs)

Is is possible to use grequests or gevent.Pool in a way that creates a number of sessions?

Put another way: How can I make many concurrent http requests using either through queuing or connection pooling?

like image 631
Marcel Wilson Avatar asked Nov 06 '13 19:11

Marcel Wilson


1 Answers

I ended up not using grequests to solve my problem. I'm still hopeful it might be possible.

I used threading:

class MyAwesomeThread(Thread):
    """
    Threading wrapper to handle counting and processing of tasks
    """
    def __init__(self, session, q):
        self.q = q
        self.count = 0
        self.session = session
        self.response = None
        Thread.__init__(self)

    def run(self): 
        """TASK RUN BY THREADING"""
        while True:
            url, host = self.q.get()
            httpHeaders = {'Host' : host}
            self.response = session.get(url, headers=httpHeaders)
            # handle response here
            self.count+= 1
            self.q.task_done()
        return

q=Queue()
threads = []
for i in range(CONCURRENT):
    session = requests.session()
    t=MyAwesomeThread(session,q)
    t.daemon=True # allows us to send an interrupt 
    threads.append(t)


## build urls and add them to the Queue
for url in buildurls():
    q.put_nowait((url,host))

## start the threads
for t in threads:
    t.start()
like image 168
Marcel Wilson Avatar answered Sep 29 '22 02:09

Marcel Wilson