I'm working with cherrypy in a server that implements a RESTful like API. The responses imply some heavy computation that takes about 2 seconds for request. To do this computations, some data is used that is updated three times a day.
The data is updated in the background (takes about half hour), and once it is updated, the references of the new data are passed to the functions that respond the requests. This takes just a milisecond.
What I need is to be sure that each request is answered either with the old data or with the new data, but none request processing can take place while the data references are being changed. Ideally, I would like to find a way of buffering incoming request while the data references are changed, and also to ensure that the references are changed after all in-process requests finished.
My current (not) working minimal example is as follows:
import time
import cherrypy
from cherrypy.process import plugins
theData = 0
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global theData # using global variables to simplify the example
theData += 1
cherrypy.engine.publish("doChangeData", theData)
class DataPublisher(object):
def __init__(self):
self.data = 'initData'
cherrypy.engine.subscribe('doChangeData', self.changeData)
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
time.sleep(1) #exageration of the 1 milisec of the references update to visualize the problem
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get "+str(self.data)
cherrypy.engine.log(result)
time.sleep(3)
return result
if __name__ == '__main__':
conf = {
'/': { 'server.socket_host': '127.0.0.1',
'server.socket_port': 8080}
}
cherrypy.config.update(conf)
btask = plugins.BackgroundTask(5, processData) #5 secs for the example
btask.start()
cherrypy.quickstart(DataPublisher())
If I run this script, and also open a browser, put localhost:8080 and refresh the page a lot, I get:
...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:42] ENGINE I get 3
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
Which means that some requests processing started before and ends after the data references start or end to being changed. I want to avoid both cases. Something like:
...
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
[17/Sep/2015:21:32:42] ENGINE I get 3
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...
I searched documentation and the web and find these references that do not completely cover this case:
http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html
How to execute asynchronous post-processing in CherryPy?
http://tools.cherrypy.org/wiki/BackgroundTaskQueue
Cherrypy : which solutions for pages with large processing time
How to stop request processing in Cherrypy?
Update (with a simple solution):
After giving more thought, I think that the question is misleading since it includes some implementation requirements in the question itself, namely: to stop processing and start buffering. While for the problem the requirement can be simplified to: be sure that each request is processed either with the old data or with the new data.
For the later, it is enough to store a temporal local reference of the used data. This reference can be used in all the request processing, and it will be no problem if another thread changes self.data
. For python objects, the garbage collector will take care of the old data.
Specifically, it is enough to change the index function by:
@cherrypy.expose
def index(self):
tempData = self.data
result = "I started with %s"%str(tempData)
time.sleep(3) # Heavy use of tempData
result += " that changed to %s"%str(self.data)
result += " but I am still using %s"%str(tempData)
cherrypy.engine.log(result)
return result
And as a result we will see:
[21/Sep/2015:10:06:00] ENGINE I started with 1 that changed to 2 but I am still using 1
I still want to keep the original (more restrictive) question and cyraxjoe answer too, since I find those solutions very useful.
I'll explain two one approaches that will solve the issue.
The first one is Plugin based.
Plugin based Still needs a kind of synchronization. It only works because there is only one BackgroundTask
making the modifications (also is just an atomic operation).
import time
import threading
import cherrypy
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
class DataGateway(plugins.SimplePlugin):
def __init__(self, bus):
super(DataGateway, self).__init__(bus)
self.data = next_data
def start(self):
self.bus.log("Starting DataGateway")
self.bus.subscribe('dg:get', self._get_data)
self.bus.subscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been started")
def stop(self):
self.bus.log("Stopping DataGateway")
self.bus.unsubscribe('dg:get', self._get_data)
self.bus.unsubscribe('dg:update', self._update_data)
self.bus.log("DataGateway has been stopped")
def _update_data(self, new_val):
self.bus.log("Changing data, buffering should start!")
self.data = new_val
time.sleep(UPDATE_DELAY)
self.bus.log("Continue serving buffered and new requests.")
def _get_data(self):
return self.data
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("dg:update", next_data)
next_data += 1
class DataPublisher(object):
@property
def data(self):
return cherrypy.engine.publish('dg:get').pop()
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
DataGateway(cherrypy.engine).subscribe()
plugins.BackgroundTask(UPDATE_DELAY, processData).start()
cherrypy.quickstart(DataPublisher())
In this version the synchronizations comes by the fact that both read & write operations are executed on the Everything is abstracted on the plugin cherrypy.engine
thread.DataGateway
you just operated publishing into the engine.
The second approach is by using an Event
a threading.Event
object. This is a more manual approach with the added benefit that it's probably going to be faster given that the reads are faster because it's doesn't execute over the cherrypy.engine
thread.
threading.Event based (a.k.a. manual)
import time
import cherrypy
import threading
from cherrypy.process import plugins
UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20
next_data = 1
def processData():
"""Backround task works for half hour three times a day,
and when finishes it publish it in the engine buffer."""
global next_data
cherrypy.engine.publish("doChangeData", next_data)
next_data += 1
class DataPublisher(object):
def __init__(self):
self._data = next_data
self._data_readable = threading.Event()
cherrypy.engine.subscribe('doChangeData', self.changeData)
@property
def data(self):
if self._data_readable.is_set():
return self._data
else:
self._data_readable.wait()
return self.data
@data.setter
def data(self, value):
self._data_readable.clear()
time.sleep(UPDATE_DELAY)
self._data = value
self._data_readable.set()
def changeData(self, newData):
cherrypy.engine.log("Changing data, buffering should start!")
self.data = newData
cherrypy.engine.log("Continue serving buffered and new requests.")
@cherrypy.expose
def index(self):
result = "I get " + str(self.data)
cherrypy.engine.log(result)
time.sleep(REQUEST_DELAY)
return result
if __name__ == '__main__':
conf = {
'global': {
'server.thread_pool': THREAD_POOL_SIZE,
'server.socket_host': '127.0.0.1',
'server.socket_port': 8080,
}
}
cherrypy.config.update(conf)
plugins.BackgroundTask(UPDATE_INTERVAL, processData).start()
cherrypy.quickstart(DataPublisher())
I've added some niceties with the @property
decorator but the real gist is on the threading.Event
and the fact that the DataPublisher
object is shared among the worker threads.
I also added the thread pool configuration required to increase the thread pool size in both examples. The default is 10.
As a way to test what I just said you can execute this Python 3 script (if you don't have python3 now you have a pretext to install it) it will do a 100 requests more or less concurrently given the thread pool.
Test script
import time
import urllib.request
import concurrent.futures
URL = 'http://localhost:8080/'
TIMEOUT = 60
DELAY = 0.05
MAX_WORKERS = 20
REQ_RANGE = range(1, 101)
def load_url():
with urllib.request.urlopen(URL, timeout=TIMEOUT) as conn:
return conn.read()
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {}
for i in REQ_RANGE:
print("Sending req {}".format(i))
futures[executor.submit(load_url)] = i
time.sleep(DELAY)
results = []
for future in concurrent.futures.as_completed(futures):
try:
data = future.result().decode()
except Exception as exc:
print(exc)
else:
results.append((futures[future], data))
curr_max = 0
for i, data in sorted(results, key=lambda r: r[0]):
new_max = int(data.split()[-1])
assert new_max >= curr_max, "The data was not updated correctly"
print("Req {}: {}".format(i, data))
curr_max = new_max
The way that you determined that you have a problem based on the log, it's not trust worthy for this kind of problems. Specially given that you don't have control over the time on which the request gets logged on the "access" log. I couldn't make it fail your code with my test code but there is indeed a race condition in the general case, in this example it should work all the time because the code is just making an atomic operation. Just one attribute assignment periodically from a central point.
I hope the code is self explanatory in case that you have a question leave a comment.
EDIT: I edited the Plugin based approach because it only works because there is just one place that is executing the plugin if you create another background task that updates the data then it could have problems when you do something more than just an assignment. Regardless the code could be what you are looking for if you will update from one BackgroundTask
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With