Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I send asynchronous http requests in python one at a time?

We have a queue of jobs and workers process these jobs one at a time. Each job requires us to format some data and issue an HTTP POST request, with the data as the request payload.

How can we have each worker issue these HTTP POST requests asynchronously in a single-threaded, non-blocking manner? We don't care about the response from the request -- all we want is for the request to execute as soon as possible and then for the worker to immediately move onto the next job.

We have explored using gevent and the grequests library (see Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?). Our worker code looks something like this:

def execute_task(worker, job):

    print "About to spawn request"
    greenlet = gevent.spawn(requests.post, url, params=params)

    print "Request spawned, about to call sleep"
    gevent.sleep()

    print "Greenlet status: ", greenlet.ready()

The first print statement executes, but the second and third print statements never get printed and the url is never hit.

How can we get these asynchronous requests to execute?

like image 560
David Kravitz Avatar asked Oct 21 '22 13:10

David Kravitz


1 Answers

1) make a Queue.Queue object

2) make as many "worker" threads as you like which loop and read from the Queue.Queue

3) feed the jobs onto the Queue.Queue

The worker threads will read off the Queue.Queue in the order they are placed on it

example that reads lines from a file and puts them in a Queue.Queue

import sys
import urllib2
import urllib
from Queue import Queue
import threading
import re

THEEND = "TERMINATION-NOW-THE-END"


#read from file into Queue.Queue asynchronously
class QueueFile(threading.Thread):
    def run(self):
        if not(isinstance(self.myq, Queue)):
            print "Queue not set to a Queue"
            sys.exit(1)
        h = open(self.f, 'r')
        for l in h:
            self.myq.put(l.strip())  # this will block if the queue is full
        self.myq.put(THEEND)

    def set_queue(self, q):
        self.myq = q

    def set_file(self, f):
        self.f = f

An idea of what a worker thread might be like (example only)

class myWorker(threading.Thread):
    def run(self):
        while(running):           
            try:
                data = self.q.get()  # read from fifo

                req = urllib2.Request("http://192.168.1.10/url/path")
                req.add_data(urllib.urlencode(data))
                h1 = urllib2.urlopen(req, timeout=10)
                res = h1.read()
                assert(len(res) > 80)

            except urllib2.HTTPError, e:
                print e

            except urllib2.URLError, e:
                print "done %d reqs " % n
                print e
                sys.exit()

To make the objects based on threading.Thread go, create the object then call "start" on the instance

like image 85
Vorsprung Avatar answered Oct 31 '22 20:10

Vorsprung