Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Which strategy to use with multiprocessing in python

I am completely new to multiprocessing. I have been reading documentation about multiprocessing module. I read about Pool, Threads, Queues etc. but I am completely lost.

What I want to do with multiprocessing is that, convert my humble http downloader, to work with multiple workers. What I am doing at the moment is, download a page, parse to page to get interesting links. Continue until all interesting links are downloaded. Now, I want to implement this with multiprocessing. But I have no idea at the moment, how to organize this work flow. I had two thoughts about this. Firstly, I thought about having two queues. One queue for links that needs to be downloaded, other for links to be parsed. One worker, downloads the pages, and adds them to queue which is for items that needs to be parsed. And other process parses a page, and adds the links it finds interesting to the other queue. Problems I expect from this approach are; first of all, why download one page at a time and parse a page at a time. Moreover, how do one process know that there are items to be added to queue later, after it exhausted all items from queue.

Another approach I thought about using is that. Have a function, that can be called with an url as an argument. This function downloads the document and starts parsing it for the links. Every time it encounters an interesting link, it instantly creates a new thread running identical function as itself. The problem I have with this approach is, how do I keep track of all the processes spawned all around, how do I know if there is still processes to running. And also, how do I limit maximum number of processes.

So I am completely lost. Can anyone suggest a good strategy, and perhaps show some example codes about how to go with the idea.

like image 226
yasar Avatar asked Sep 24 '11 19:09

yasar


2 Answers

Here is one approach, using multiprocessing. (Many thanks to @Voo, for suggesting many improvements to the code).

import multiprocessing as mp
import logging
import Queue
import time

logger=mp.log_to_stderr(logging.DEBUG)  # or, 
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages

def worker(url_queue,seen):
    while True:
        url=url_queue.get()
        if url not in seen:
            logger.info('downloading {u}'.format(u=url))
            seen[url]=True
            # Replace this with code to dowload url
            # urllib2.open(...)
            time.sleep(0.5)
            content=url
            logger.debug('parsing {c}'.format(c=content))
            # replace this with code that finds interesting links and
            # puts them in url_queue
            for i in range(3):
                if content<5:
                    u=2*content+i-1
                    logger.debug('adding {u} to url_queue'.format(u=u))
                    time.sleep(0.5)
                    url_queue.put(u)
        else:
            logger.debug('skipping {u}; seen before'.format(u=url))
        url_queue.task_done()

if __name__=='__main__':
    num_workers=4
    url_queue=mp.JoinableQueue()
    manager=mp.Manager()
    seen=manager.dict()

    # prime the url queue with at least one url
    url_queue.put(1)
    downloaders=[mp.Process(target=worker,args=(url_queue,seen))
                 for i in range(num_workers)]
    for p in downloaders:
        p.daemon=True
        p.start()
    url_queue.join()
  • A pool of (4) worker processes are created.
  • There is a JoinableQueue, called url_queue.
  • Each worker gets a url from the url_queue, finds new urls and adds them to the url_queue.
  • Only after adding new items does it call url_queue.task_done().
  • The main process calls url_queue.join(). This blocks the main process until task_done has been called for every task in the url_queue.
  • Since the worker processes have the daemon attribute set to True, they too end when the main process ends.

All the components used in this example are also explained in Doug Hellman's excellent Python Module of the Week tutorial on multiprocessing.

like image 62
unutbu Avatar answered Sep 19 '22 02:09

unutbu


What you're describing is essentially graph traversal; Most graph traversal algorithms (That are more sophisticated than depth first), keep track of two sets of nodes, in your case, the nodes are url's.

The first set is called the "closed set", and represents all of the nodes that have already been visited and processed. If, while you're processing a page, you find a link that happens to be in the closed set, you can ignore it, it's already been handled.

The second set is unsurprisingly called the "open set", and includes all of the edges that have been found, but not yet processed.

The basic mechanism is to start by putting the root node into the open set (the closed set is initially empty, no nodes have been processed yet), and start working. Each worker takes a single node from the open set, copies it to the closed set, processes the node, and adds any nodes it discovers back to the open set (so long as they aren't already in either the open or closed sets). Once the open set is empty, (and no workers are still processing nodes) the graph has been completely traversed.

Actually implementing this in multiprocessing probably means that you'll have a master task that keeps track of the open and closed sets; If a worker in a worker pool indicates that it is ready for work, the master worker takes care of moving the node from the open set to the closed set and starting up the worker. the workers can then pass all of the nodes they find, without worrying about if they are already closed, back to the master; and the master will ignore nodes that are already closed.

like image 40
SingleNegationElimination Avatar answered Sep 19 '22 02:09

SingleNegationElimination