Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

CherryPy: how to stop and buffer incoming request while data is updated

I'm working with cherrypy in a server that implements a RESTful like API. The responses imply some heavy computation that takes about 2 seconds for request. To do this computations, some data is used that is updated three times a day.

The data is updated in the background (takes about half hour), and once it is updated, the references of the new data are passed to the functions that respond the requests. This takes just a milisecond.

What I need is to be sure that each request is answered either with the old data or with the new data, but none request processing can take place while the data references are being changed. Ideally, I would like to find a way of buffering incoming request while the data references are changed, and also to ensure that the references are changed after all in-process requests finished.

My current (not) working minimal example is as follows:

import time
import cherrypy
from cherrypy.process import plugins

theData = 0

def processData():
    """Backround task works for half hour three times a day, 
        and when finishes it publish it in the engine buffer."""
    global theData # using global variables to simplify the example
    theData += 1
    cherrypy.engine.publish("doChangeData", theData)

class DataPublisher(object):

    def __init__(self):
        self.data = 'initData'
        cherrypy.engine.subscribe('doChangeData', self.changeData)

    def changeData(self, newData):
        cherrypy.engine.log("Changing data, buffering should start!")
        self.data = newData
        time.sleep(1) #exageration of the 1 milisec of  the references update to visualize the problem
        cherrypy.engine.log("Continue serving buffered and new requests.")

    @cherrypy.expose
    def index(self):
        result = "I get "+str(self.data)
        cherrypy.engine.log(result)
        time.sleep(3) 
        return result

if __name__ == '__main__':
    conf = {
         '/': { 'server.socket_host': '127.0.0.1',
                'server.socket_port': 8080} 
        }
    cherrypy.config.update(conf)

    btask = plugins.BackgroundTask(5, processData) #5 secs for the example                          
    btask.start()

    cherrypy.quickstart(DataPublisher())

If I run this script, and also open a browser, put localhost:8080 and refresh the page a lot, I get:

...
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "... 
[17/Sep/2015:21:32:42] ENGINE I get 3
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...

Which means that some requests processing started before and ends after the data references start or end to being changed. I want to avoid both cases. Something like:

...
127.0.0.1 - - [17/Sep/2015:21:32:41] "GET / HTTP/1.1" 200 7 "... 
[17/Sep/2015:21:32:41] ENGINE Changing data, buffering should start!
[17/Sep/2015:21:32:42] ENGINE Continue serving buffered and new requests.
[17/Sep/2015:21:32:42] ENGINE I get 3
127.0.0.1 - - [17/Sep/2015:21:24:44] "GET / HTTP/1.1" 200 7 "...
...

I searched documentation and the web and find these references that do not completely cover this case:

http://www.defuze.org/archives/198-managing-your-process-with-the-cherrypy-bus.html

How to execute asynchronous post-processing in CherryPy?

http://tools.cherrypy.org/wiki/BackgroundTaskQueue

Cherrypy : which solutions for pages with large processing time

How to stop request processing in Cherrypy?

Update (with a simple solution):

After giving more thought, I think that the question is misleading since it includes some implementation requirements in the question itself, namely: to stop processing and start buffering. While for the problem the requirement can be simplified to: be sure that each request is processed either with the old data or with the new data.

For the later, it is enough to store a temporal local reference of the used data. This reference can be used in all the request processing, and it will be no problem if another thread changes self.data. For python objects, the garbage collector will take care of the old data.

Specifically, it is enough to change the index function by:

@cherrypy.expose
def index(self):
    tempData = self.data
    result = "I started with %s"%str(tempData)
    time.sleep(3) # Heavy use of tempData
    result += " that changed to %s"%str(self.data)
    result += " but I am still using %s"%str(tempData)
    cherrypy.engine.log(result)
    return result

And as a result we will see:

[21/Sep/2015:10:06:00] ENGINE I started with 1 that changed to 2 but I am still using 1

I still want to keep the original (more restrictive) question and cyraxjoe answer too, since I find those solutions very useful.

like image 412
eguaio Avatar asked Oct 20 '22 02:10

eguaio


1 Answers

I'll explain two one approaches that will solve the issue.

The first one is Plugin based.

Plugin based Still needs a kind of synchronization. It only works because there is only one BackgroundTask making the modifications (also is just an atomic operation).

import time
import threading

import cherrypy
from cherrypy.process import plugins

UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20

next_data = 1

class DataGateway(plugins.SimplePlugin):

    def __init__(self, bus):
        super(DataGateway, self).__init__(bus)
        self.data = next_data

    def start(self):
        self.bus.log("Starting DataGateway")
        self.bus.subscribe('dg:get', self._get_data)
        self.bus.subscribe('dg:update', self._update_data)
        self.bus.log("DataGateway has been started")

    def stop(self):
        self.bus.log("Stopping DataGateway")
        self.bus.unsubscribe('dg:get', self._get_data)
        self.bus.unsubscribe('dg:update', self._update_data)
        self.bus.log("DataGateway has been stopped")

    def _update_data(self, new_val):
        self.bus.log("Changing data, buffering should start!")
        self.data = new_val
        time.sleep(UPDATE_DELAY)
        self.bus.log("Continue serving buffered and new requests.")

    def _get_data(self):
        return self.data


def processData():
    """Backround task works for half hour three times a day,
        and when finishes it publish it in the engine buffer."""
    global next_data
    cherrypy.engine.publish("dg:update", next_data)
    next_data += 1


class DataPublisher(object):

    @property
    def data(self):
        return cherrypy.engine.publish('dg:get').pop()

    @cherrypy.expose
    def index(self):
        result = "I get " + str(self.data)
        cherrypy.engine.log(result)
        time.sleep(REQUEST_DELAY)
        return result

if __name__ == '__main__':
    conf = {
        'global': {
            'server.thread_pool': THREAD_POOL_SIZE,
            'server.socket_host': '127.0.0.1',
            'server.socket_port': 8080,
        }
    }
    cherrypy.config.update(conf)
    DataGateway(cherrypy.engine).subscribe()
    plugins.BackgroundTask(UPDATE_DELAY, processData).start()
    cherrypy.quickstart(DataPublisher())

In this version the synchronizations comes by the fact that both read & write operations are executed on the cherrypy.engine thread. Everything is abstracted on the plugin DataGateway you just operated publishing into the engine.

The second approach is by using an Event a threading.Event object. This is a more manual approach with the added benefit that it's probably going to be faster given that the reads are faster because it's doesn't execute over the cherrypy.engine thread.

threading.Event based (a.k.a. manual)

import time
import cherrypy
import threading
from cherrypy.process import plugins

UPDATE_INTERVAL = 0.5
REQUEST_DELAY = 0.1
UPDATE_DELAY = 0.1
THREAD_POOL_SIZE = 20

next_data = 1

def processData():
    """Backround task works for half hour three times a day,
        and when finishes it publish it in the engine buffer."""
    global next_data
    cherrypy.engine.publish("doChangeData", next_data)
    next_data += 1


class DataPublisher(object):

    def __init__(self):
        self._data = next_data
        self._data_readable = threading.Event()
        cherrypy.engine.subscribe('doChangeData', self.changeData)

    @property
    def data(self):
        if self._data_readable.is_set():
            return self._data
        else:
            self._data_readable.wait()
            return self.data

    @data.setter
    def data(self, value):
        self._data_readable.clear()
        time.sleep(UPDATE_DELAY)
        self._data = value
        self._data_readable.set()

    def changeData(self, newData):
        cherrypy.engine.log("Changing data, buffering should start!")
        self.data = newData
        cherrypy.engine.log("Continue serving buffered and new requests.")

    @cherrypy.expose
    def index(self):
        result = "I get " + str(self.data)
        cherrypy.engine.log(result)
        time.sleep(REQUEST_DELAY)
        return result

if __name__ == '__main__':
    conf = {
        'global': {
            'server.thread_pool': THREAD_POOL_SIZE,
            'server.socket_host': '127.0.0.1',
            'server.socket_port': 8080,
        }
    }
    cherrypy.config.update(conf)
    plugins.BackgroundTask(UPDATE_INTERVAL, processData).start()
    cherrypy.quickstart(DataPublisher())

I've added some niceties with the @property decorator but the real gist is on the threading.Event and the fact that the DataPublisher object is shared among the worker threads.

I also added the thread pool configuration required to increase the thread pool size in both examples. The default is 10.

As a way to test what I just said you can execute this Python 3 script (if you don't have python3 now you have a pretext to install it) it will do a 100 requests more or less concurrently given the thread pool.

Test script

import time
import urllib.request
import concurrent.futures


URL = 'http://localhost:8080/'
TIMEOUT = 60
DELAY = 0.05
MAX_WORKERS = 20
REQ_RANGE = range(1, 101)

def load_url():
    with urllib.request.urlopen(URL, timeout=TIMEOUT) as conn:
        return conn.read()


with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
    futures = {}
    for i in REQ_RANGE:
        print("Sending req {}".format(i))
        futures[executor.submit(load_url)] = i
        time.sleep(DELAY)
    results = []
    for future in concurrent.futures.as_completed(futures):
        try:
            data = future.result().decode()
        except Exception as exc:
            print(exc)
        else:
            results.append((futures[future], data))
    curr_max = 0
    for i, data in sorted(results, key=lambda r: r[0]):
        new_max = int(data.split()[-1])
        assert new_max >= curr_max, "The data was not updated correctly"
        print("Req {}: {}".format(i, data))
        curr_max = new_max

The way that you determined that you have a problem based on the log, it's not trust worthy for this kind of problems. Specially given that you don't have control over the time on which the request gets logged on the "access" log. I couldn't make it fail your code with my test code but there is indeed a race condition in the general case, in this example it should work all the time because the code is just making an atomic operation. Just one attribute assignment periodically from a central point.

I hope the code is self explanatory in case that you have a question leave a comment.

EDIT: I edited the Plugin based approach because it only works because there is just one place that is executing the plugin if you create another background task that updates the data then it could have problems when you do something more than just an assignment. Regardless the code could be what you are looking for if you will update from one BackgroundTask.

like image 109
cyraxjoe Avatar answered Oct 30 '22 05:10

cyraxjoe