We have a queue of jobs and workers process these jobs one at a time. Each job requires us to format some data and issue an HTTP POST request, with the data as the request payload.
How can we have each worker issue these HTTP POST requests asynchronously in a single-threaded, non-blocking manner? We don't care about the response from the request -- all we want is for the request to execute as soon as possible and then for the worker to immediately move onto the next job.
We have explored using gevent
and the grequests
library (see Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?). Our worker code looks something like this:
def execute_task(worker, job):
print "About to spawn request"
greenlet = gevent.spawn(requests.post, url, params=params)
print "Request spawned, about to call sleep"
gevent.sleep()
print "Greenlet status: ", greenlet.ready()
The first print statement executes, but the second and third print statements never get printed and the url is never hit.
How can we get these asynchronous requests to execute?
1) make a Queue.Queue object
2) make as many "worker" threads as you like which loop and read from the Queue.Queue
3) feed the jobs onto the Queue.Queue
The worker threads will read off the Queue.Queue in the order they are placed on it
example that reads lines from a file and puts them in a Queue.Queue
import sys
import urllib2
import urllib
from Queue import Queue
import threading
import re
THEEND = "TERMINATION-NOW-THE-END"
#read from file into Queue.Queue asynchronously
class QueueFile(threading.Thread):
def run(self):
if not(isinstance(self.myq, Queue)):
print "Queue not set to a Queue"
sys.exit(1)
h = open(self.f, 'r')
for l in h:
self.myq.put(l.strip()) # this will block if the queue is full
self.myq.put(THEEND)
def set_queue(self, q):
self.myq = q
def set_file(self, f):
self.f = f
An idea of what a worker thread might be like (example only)
class myWorker(threading.Thread):
def run(self):
while(running):
try:
data = self.q.get() # read from fifo
req = urllib2.Request("http://192.168.1.10/url/path")
req.add_data(urllib.urlencode(data))
h1 = urllib2.urlopen(req, timeout=10)
res = h1.read()
assert(len(res) > 80)
except urllib2.HTTPError, e:
print e
except urllib2.URLError, e:
print "done %d reqs " % n
print e
sys.exit()
To make the objects based on threading.Thread go, create the object then call "start" on the instance
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With