Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Python web service subscribed to reactive source produces strange behavior in object

I have implemented a web service using Falcon. This service stores a state machine (pytransitions) that is passed to service's resources in the constructor. The service is runs with gunicorn.

The web service launches a process on start using RxPy. The event returned in the on_next(event) is used to trigger a transition in the state machine.

THE BUG

I expect that the state machine has a consistent state both in the service and in the resources but it seems that in the resources the state never changes.

We have a test that tries to reproduce this behavior, but surprisingly the test works

class TochoLevel(object):

    def __init__(self, tochine):
        self.tochine = tochine

    def on_get(self, req, res):
        res.status = falcon.HTTP_200
        res.body = self.tochine.state


def get_machine():
    states = ["low", "medium", "high"]

    transitions = [
        {'trigger': 'to_medium', 'source': ['low', 'medium', 'high'], 'dest': 'medium'},
        {'trigger': 'to_high', 'source': ['low', 'medium', 'high'], 'dest': 'high'},
        {'trigger': 'to_low', 'source': ['low', 'medium', 'high'], 'dest': 'low'}
    ]

    locked_factory = MachineFactory.get_predefined(locked=True)

    return locked_factory(
        states=states,
        transitions=transitions,
        initial='low',
        auto_transitions=False,
        queued=False
    )

def _level_observable(observer):
    for i in range(1, 21):
        sleep(0.1)
        next_val = 'to_low'

        if 8 <= i <= 15:
            next_val = 'to_medium'
        elif i > 15:
            next_val = 'to_high'
        observer.on_next(next_val)

    observer.on_completed()

def get_level_observable():
    return Observable.create(_level_observable)

class NotBlockingService(falcon.API):
    def __init__(self):
        super(NotBlockingService, self).__init__()

        self.tochine = get_machine()
        self.add_route('/tochez', TochoLevel(self.tochine))

    def _run_machine(self, val):
        self.tochine.trigger(val)
        print('machine exec: {}, state: {}'.format(val, self.tochine.state))
        return self.tochine.state

    def start(self):
        source = get_level_observable()
        (source.subscribe_on(ThreadPoolScheduler(2))
            .subscribe(self._run_machine))


def test_can_query_falcon_service_while_being_susbcribed_as_observer():

    svc = NotBlockingService()
    client = testing.TestClient(svc)

    assert client.simulate_get('/tochez').text == 'low'

    start = time()
    svc.start()
    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'medium'
    end = time()

    sleep(1.2)

    assert client.simulate_get('/tochez').text == 'high'
    assert (end - start) < 2

THE QUESTION

Why the state machine does not change the state in the resource TochoLevel when I launch the service with gunicorn and propagate the states in the on_next method of rxpy?

like image 481
Jav_Rock Avatar asked May 14 '18 06:05

Jav_Rock


1 Answers

Surely when you execute your service in develop mode, you are using only one fork (one execution process). When you use software like Gunicorn your are using preforking strategy for reliable service in production environment.

Preforking strategy generates many subprocess to resolve the request and the logic are independent, working each fork in standalone mode between different requests.

Gunicorn, thanks to standardized App scheme for WSGI in Python (Python2_PEP-333 & Python3_PEP-3333), receives an APP object. Gunicorn launches as many instances (preforks) as indicated in its configuration. Gunicorn calls such forks workers and by default it uses 1 worker. Each worker will work with its status and maybe Gunicorn also creates new App object instance for each request...

This is the reason behind your state machine without persistence.

💡 Tip: Try first to launch Gunicorn with 1 worker and check the state persistence of the state machine. If you achieve the persistence of the state machine the second problem to solve will be the state machine synchronization along all the workers.

like image 71
RDCH106 Avatar answered Oct 28 '22 11:10

RDCH106