Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python multithreading crawler

Hello! I am trying to write web crawler with python. I wanted to use python multithreading. Even after reading earlier suggested papers and tutorials, I still have problem. My code is here (whole source code is here):

class Crawler(threading.Thread):

    global g_URLsDict 
    varLock = threading.Lock()
    count = 0

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.url = self.queue.get()

    def run(self):
        while 1:
            print self.getName()+" started" 
            self.page = getPage(self.url)
            self.parsedPage = getParsedPage(self.page, fix=True)
            self.urls = getLinksFromParsedPage(self.parsedPage)

            for url in self.urls:

                self.fp = hashlib.sha1(url).hexdigest()

                #url-seen check
                Crawler.varLock.acquire() #lock for global variable g_URLs
                if self.fp in g_URLsDict:
                    Crawler.varLock.release() #releasing lock
                else:
                    #print url+" does not exist"
                    Crawler.count +=1
                    print "total links: %d"%len(g_URLsDict)
                    print self.fp
                    g_URLsDict[self.fp] = url
                    Crawler.varLock.release() #releasing lock
                    self.queue.put(url)

                    print self.getName()+ " %d"%self.queue.qsize()
                    self.queue.task_done()
            #self.queue.task_done()
        #self.queue.task_done()


print g_URLsDict
queue = Queue.Queue()
queue.put("http://www.ertir.com")

for i in range(5):
    t = Crawler(queue)
    t.setDaemon(True)
    t.start()

queue.join()

it does not work as needed, it does not give any result after thread 1 and it excutes differently some time gives this error:

Exception in thread Thread-2 (most likely raised during interpreter shutdown):

How can I fix it? And also I do not think this is more effective than just for loop.

I have tried to fix run():

def run(self):
    while 1:
        print self.getName()+" started" 
        self.page = getPage(self.url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()
                #self.queue.task_done()
        #self.queue.task_done()
    self.queue.task_done()

I experimented with task_done() command, in different places, can anyone explain difference?

like image 849
torayeff Avatar asked Oct 08 '22 17:10

torayeff


1 Answers

You only call self.url = self.queue.get() when the threads initialise. You need to try and re-acquire urls from your queue inside your while loop if you want to pick up new urls for processing further down the line.

Try replacing self.page = getPage(self.url) with self.page = getPage(self.queue.get()). Be aware that the get function will block indefinitely. You probably want to timeout after a while and add some way for your background threads to exit gracefully by request (which would eliminate the Exception you saw).

There are some good examples on effbot.org which use get() in the way I've described above.

Edit - Answers to your initial comments:

Have a look at the docs for task_done(); For every call to get() (which doesn't timeout) you should call task_done() which tells any blocking calls to join() that everything on that queue is now processed. Each call to get() will block (sleep) while it waits for a new url to be posted on the queue.

Edit2 - Try this alternative run function:

def run(self):
    while 1:
        print self.getName()+" started"
        url = self.queue.get() # <-- note that we're blocking here to wait for a url from the queue
        self.page = getPage(url)
        self.parsedPage = getParsedPage(self.page, fix=True)
        self.urls = getLinksFromParsedPage(self.parsedPage)

        for url in self.urls:

            self.fp = hashlib.sha1(url).hexdigest()

            #url-seen check
            Crawler.varLock.acquire() #lock for global variable g_URLs
            if self.fp in g_URLsDict:
                Crawler.varLock.release() #releasing lock
            else:
                #print url+" does not exist"
                Crawler.count +=1
                print "total links: %d"%len(g_URLsDict)
                print self.fp
                g_URLsDict[self.fp] = url
                Crawler.varLock.release() #releasing lock
                self.queue.put(url)

                print self.getName()+ " %d"%self.queue.qsize()

        self.queue.task_done() # <-- We've processed the url this thread pulled off the queue so indicate we're done with it.
like image 154
Jon Cage Avatar answered Oct 10 '22 08:10

Jon Cage