Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use multiprocessing with requests module?

I am new dev in python. My code is code below:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def run(self):
        with open('ips.txt', 'r') as urls:
            for url in urls.readlines():
                req = url.strip()
                try:
                    page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                        timeout=10)
                    soup = BS(page.text)
                    # string = string.encode('ascii', 'ignore')
                    print('\033[32m' + req + ' - Title: ', soup.title)
                except requests.RequestException as e:
                    print('\033[32m' + req + ' - TimeOut!')
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

I am trying to make the program read IPs.txt and print out the title of each website.

It works flawlessly in a single thread. Now I want to make it faster by using multiprocessing.

But it just outputs the same line 5 times for some reason. I am new with multiprocessing and tried my best with failed attempts.

Screen shot showing problem:

screen shot showing problem

I just want to run 5 workers to check the IPs.txt in multithreading or parallel...I just want to make it faster.

Any hint, clue, help?

like image 846
asni kuwait Avatar asked Feb 25 '19 03:02

asni kuwait


Video Answer


1 Answers

Issue

The primary issue in your code is that each Worker opens ips.txt from scratch and works on each URL found in ips.txt. Thus the five workers together open ips.txt five times and work on each URL five times.

Solution

The right way to solve this problem is to split the code into master and worker. You already have most of the worker code implemented. Let us treat the main section (under if __name__ == '__main__':) as the master for now.

Now the master is supposed to launch five workers and send work to them via a queue (multiprocessing.Queue).

The multiprocessing.Queue class offers a way for multiple producers to put data into it and multiple consumers to read data from it without running into race conditions. This class implements all the necessary locking semantics to exchange data safely in a multiprocessing context and prevent race conditions.

Fixed Code

Here is how your code could be rewritten as per what I've described above:

import warnings
import requests
import multiprocessing

from colorama import init
init(autoreset=True)

from requests.packages.urllib3.exceptions import InsecureRequestWarning
warnings.simplefilter("ignore", UserWarning)
warnings.simplefilter('ignore', InsecureRequestWarning)

from bs4 import BeautifulSoup as BS

headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/39.0.2171.95 Safari/537.36'}


class Worker(multiprocessing.Process):

    def __init__(self, job_queue):
        super().__init__()
        self._job_queue = job_queue

    def run(self):
        while True:
            url = self._job_queue.get()
            if url is None:
                break

            req = url.strip()

            try:
                page = requests.get(req, headers=headers, verify=False, allow_redirects=False, stream=True,
                                    timeout=10)
                soup = BS(page.text)
                # string = string.encode('ascii', 'ignore')
                print('\033[32m' + req + ' - Title: ', soup.title)
            except requests.RequestException as e:
                print('\033[32m' + req + ' - TimeOut!')


if __name__ == '__main__':
    jobs = []
    job_queue = multiprocessing.Queue()

    for i in range(5):
        p = Worker(job_queue)
        jobs.append(p)
        p.start()

    # This is the master code that feeds URLs into queue.
    with open('ips.txt', 'r') as urls:
        for url in urls.readlines():
            job_queue.put(url)

    # Send None for each worker to check and quit.
    for j in jobs:
        job_queue.put(None)

    for j in jobs:
        j.join()

We can see in the above code that the master opens ips.txt once, reads the URLs from it one by one and puts them into the queue. Each worker waits for a URL to arrive on this queue. As soon as a URL arrives on the queue, one of the workers picks it up and gets busy. If there are more URLs in the queue, the next free worker picks the next one up and so on.

Finally, we need some way for the workers to quit when all work is done. There are several ways to achieve this. In this example, I have chosen a simple strategy of sending five sentinel values (five None values in this case) into the queue, one for each worker, so that each worker can pick this up and quit.

There is another strategy where the workers and the master share a multiprocessing.Event object just like they share a multiprocessing.Queue object right now. The master invokes the set() method of this object whenever it wants the workers to quit. The workers check if this object is_set() and quit. However, this introduces some additional complexity into the code. I've discussed this below.

For the sake of completeness and also for the sake of demonstrating minimal, complete, and verifiable examples, I am presenting two code examples below that show both stopping strategies.

Using Sentinel Value to Stop Workers

This is pretty much what I have described above so far except that the code example has been simplified a lot to remove dependencies on any libraries outside the Python standard library.

Another thing worth noting in the example below is that instead of creating a worker class, we use a worker function and create a Process out of it. This type of code is often found in the Python documentation and it is quite idiomatic.

import multiprocessing
import time
import random


def worker(input_queue):
    while True:
        url = input_queue.get()

        if url is None:
            break

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.Queue()
    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(input_queue, ))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Ask the workers to quit.
    for w in workers:
        input_queue.put(None)

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()

Using Event to Stop Workers

Using an multiprocessing.Event object to signal when workers should quit introduces some complexity in the code. There are primarily three changes that have to be made:

  • In the master, we invoke the set() method on the Event object to signal that workers should quit as soon as possible.
  • In the worker, we invoke the is_set() method of the Event object periodically to check if it should quit.
  • In the master, we need to use multiprocessing.JoinableQueue instead of multiprocessing.Queue so that it can test if the queue has been consumed completely by the workers before it asks the workers to quit.
  • In the worker, we need to invoke the task_done() method of the queue after every item from the queue is consumed. This is necessary for the master to invoke the join() method of the queue to test if it has been emptied.

All of these changes can be found in the code below:

import multiprocessing
import time
import random
import queue


def worker(input_queue, stop_event):
    while not stop_event.is_set():
        try:
            # Check if any URL has arrived in the input queue. If not,
            # loop back and try again.
            url = input_queue.get(True, 1)
            input_queue.task_done()
        except queue.Empty:
            continue

        print('Started working on:', url)

        # Random delay to simulate fake processing.
        time.sleep(random.randint(1, 3))

        print('Stopped working on:', url)


def master():
    urls = [
        'https://example.com/',
        'https://example.org/',
        'https://example.net/',
        'https://stackoverflow.com/',
        'https://www.python.org/',
        'https://github.com/',
        'https://susam.in/',
    ]

    input_queue = multiprocessing.JoinableQueue()
    stop_event = multiprocessing.Event()

    workers = []

    # Create workers.
    for i in range(5):
        p = multiprocessing.Process(target=worker,
                                    args=(input_queue, stop_event))
        workers.append(p)
        p.start()

    # Distribute work.
    for url in urls:
        input_queue.put(url)

    # Wait for the queue to be consumed.
    input_queue.join()

    # Ask the workers to quit.
    stop_event.set()

    # Wait for workers to quit.
    for w in workers:
        w.join()

    print('Done')


if __name__ == '__main__':
    master()
like image 197
Susam Pal Avatar answered Sep 20 '22 18:09

Susam Pal