I'm quite new to twisted and I'm trying to make an asynchronous client that fetches some urls and saves the result into a distinct file for each url. When I run the program with a limited amount of servers let's say 10, the reactor loop correctly ends and the program terminates. But when I run the program with for example the Alexa top 2500, the program starts to fetch urls but then does not terminate. I've set the timeout but it does not work, I believe that there must be some open socket that does not trigger any callback either for error or success. My aim is once the program has fetched the pages or the per-connection timeout has expired the program has to terminate and close all the file descriptors active.
I'm Sorry but the code indentation is not kept while copy and pasting, now I've checked and it's fixed. The code is the bare minimum to give an example, note that with my problem is that the reactor is not stopping when I start the program with a huge number of sites to crawl.
#!/usr/bin/env python
from pprint import pformat
from twisted.internet import reactor
import twisted.internet.defer
import sys
from twisted.internet.protocol import Protocol
from twisted.web.client import Agent
from twisted.web.http_headers import Headers
class PrinterClient(Protocol):
def __init__(self, whenFinished, output):
self.whenFinished = whenFinished
self.output = output
def dataReceived(self, bytes):
#print '##### Received #####\n%s' % (bytes,)
self.output.write('%s' % (bytes,))
def connectionLost(self, reason):
print 'Finished:', reason.getErrorMessage()
self.output.write('Finished: %s \n'%(reason.getErrorMessage()))
self.output.write('#########end########%s\n'%(reason.getErrorMessage()))
self.whenFinished.callback(None)
def handleResponse(r, output, url):
output.write('############start############\n')
output.write('%s\n'%(url))
#print "version=%s\ncode=%s\nphrase='%s'" % (r.version, r.code, r.phrase)
output.write("version=%s\ncode=%s\nphrase='%s'"\
%(r.version, r.code, r.phrase))
for k, v in r.headers.getAllRawHeaders():
#print "%s: %s" % (k, '\n '.join(v))
output.write("%s: %s\n" % (k, '\n '.join(v)))
whenFinished = twisted.internet.defer.Deferred()
r.deliverBody(PrinterClient(whenFinished, output))
return whenFinished
def handleError(reason):
print reason
#reason.printTraceback()
#reactor.stop()
def getPage(url, output):
print "Requesting %s" % (url,)
d = Agent(reactor).request('GET',
url,
Headers({'User-Agent': ['Mozilla/4.0 (Windows XP 5.1) Java/1.6.0_26']}),
None)
d._connectTimeout = 10
d.addCallback(handleResponse, output, url)
d.addErrback(handleError)
return d
if __name__ == '__main__':
semaphore = twisted.internet.defer.DeferredSemaphore(500)
dl = list()
ipset = set()
queryset = set(['http://www.google.com','http://www.google1.com','http://www.google2.com', "up to 2500 sites"])
filemap = {}
for q in queryset:
fpos = q.split('http://')[1].split(':')[0]
dl.append(semaphore.run(getPage, q, filemap[fpos]))
dl = twisted.internet.defer.DeferredList(dl)
dl.addCallbacks(lambda x: reactor.stop(), handleError)
reactor.run()
for k in filemap:
filemap[k].close()
Thanks. Jeppo
There are at least two issues with your timeout code.
First, the only timeout you set is _connectTimeout
, and you set it on a Deferred
returned from Agent.request
. This is a meaningless attribute, and nothing in the Agent
implementation nor any part of Twisted will respect it. I think you meant to set this attribute on the Agent
instance instead, where it would have had some affect. However, it is a private attribute not meant for you to interact with directly. Instead, you should pass connectTimeout=10
to the Agent
initializer.
Second, this timeout only affects the TCP connection setup timeout. Setting it to 10
means that if a TCP connection to the HTTP server for a particular URL cannot be established in fewer than 10 seconds, the request attempt will be failed with a timeout error. If the connection is successfully established in less than 10 seconds, though, the timeout has no further meaning. If the server takes 10 hours to send you a response, Agent
will sit there and wait for 10 hours. You need an additional timeout, a whole-request timeout.
This is something to implement separately using reactor.callLater
and possibly Deferred.cancel
. For example,
...
d = agent.request(...)
timeoutCall = reactor.callLater(60, d.cancel)
def completed(passthrough):
if timeoutCall.active():
timeoutCall.cancel()
return passthrough
d.addBoth(completed)
...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With