Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

python twisted agent timeout

I'm quite new to twisted and I'm trying to make an asynchronous client that fetches some urls and saves the result into a distinct file for each url. When I run the program with a limited amount of servers let's say 10, the reactor loop correctly ends and the program terminates. But when I run the program with for example the Alexa top 2500, the program starts to fetch urls but then does not terminate. I've set the timeout but it does not work, I believe that there must be some open socket that does not trigger any callback either for error or success. My aim is once the program has fetched the pages or the per-connection timeout has expired the program has to terminate and close all the file descriptors active.

I'm Sorry but the code indentation is not kept while copy and pasting, now I've checked and it's fixed. The code is the bare minimum to give an example, note that with my problem is that the reactor is not stopping when I start the program with a huge number of sites to crawl.

#!/usr/bin/env python

from pprint import pformat
from twisted.internet import reactor
import twisted.internet.defer
import sys
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers

class PrinterClient(Protocol):
    def __init__(self, whenFinished, output):
         self.whenFinished = whenFinished
         self.output = output

    def dataReceived(self, bytes):
         #print '##### Received #####\n%s' % (bytes,)
         self.output.write('%s' % (bytes,))

    def connectionLost(self, reason):
        print 'Finished:', reason.getErrorMessage()
        self.output.write('Finished: %s \n'%(reason.getErrorMessage()))
        self.output.write('#########end########%s\n'%(reason.getErrorMessage()))
        self.whenFinished.callback(None)

def handleResponse(r, output, url):
    output.write('############start############\n')
    output.write('%s\n'%(url))
    #print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
    output.write("version=%s\ncode=%s\nphrase='%s'"\
             %(r.version, r.code, r.phrase))
    for k, v in r.headers.getAllRawHeaders():
        #print "%s: %s" % (k, '\n  '.join(v))
        output.write("%s: %s\n" % (k, '\n  '.join(v)))
    whenFinished = twisted.internet.defer.Deferred()
    r.deliverBody(PrinterClient(whenFinished, output))
    return whenFinished

def handleError(reason):
    print reason
    #reason.printTraceback()
    #reactor.stop()

def getPage(url, output):
    print "Requesting %s" % (url,)
    d = Agent(reactor).request('GET',
                       url,
    Headers({'User-Agent': ['Mozilla/4.0 (Windows XP 5.1) Java/1.6.0_26']}),
                       None)
    d._connectTimeout = 10
    d.addCallback(handleResponse, output, url)
    d.addErrback(handleError)
    return d

if __name__ == '__main__':
    semaphore = twisted.internet.defer.DeferredSemaphore(500)
    dl = list()
    ipset = set()
    queryset =  set(['http://www.google.com','http://www.google1.com','http://www.google2.com', "up to 2500 sites"])
    filemap = {}
    for q in queryset:
        fpos = q.split('http://')[1].split(':')[0]
        dl.append(semaphore.run(getPage, q, filemap[fpos]))
    dl = twisted.internet.defer.DeferredList(dl)
    dl.addCallbacks(lambda x: reactor.stop(), handleError)
    reactor.run()
    for k in filemap:
        filemap[k].close()

Thanks. Jeppo

like image 750
user2092063 Avatar asked Feb 18 '23 11:02

user2092063


1 Answers

There are at least two issues with your timeout code.

First, the only timeout you set is _connectTimeout, and you set it on a Deferred returned from Agent.request. This is a meaningless attribute, and nothing in the Agent implementation nor any part of Twisted will respect it. I think you meant to set this attribute on the Agent instance instead, where it would have had some affect. However, it is a private attribute not meant for you to interact with directly. Instead, you should pass connectTimeout=10 to the Agent initializer.

Second, this timeout only affects the TCP connection setup timeout. Setting it to 10 means that if a TCP connection to the HTTP server for a particular URL cannot be established in fewer than 10 seconds, the request attempt will be failed with a timeout error. If the connection is successfully established in less than 10 seconds, though, the timeout has no further meaning. If the server takes 10 hours to send you a response, Agent will sit there and wait for 10 hours. You need an additional timeout, a whole-request timeout.

This is something to implement separately using reactor.callLater and possibly Deferred.cancel. For example,

...
d = agent.request(...)
timeoutCall = reactor.callLater(60, d.cancel)
def completed(passthrough):
    if timeoutCall.active():
        timeoutCall.cancel()
    return passthrough
d.addBoth(completed)
...
like image 128
Jean-Paul Calderone Avatar answered Feb 20 '23 11:02

Jean-Paul Calderone