Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Streaming Connection Using Python Bottle, Multiprocessing, and gevent

I have a Bottle application that uses subprocesses to do most of the work for requests. For routes that return a single response, I do something like what's below.

@route('/index')
def index():
    worker = getWorker()
    return worker.doStuff()

One of my routes needs to be a data stream. I can't figure out a smart way to have the worker return a response stream. The example below is similar to what I want to do, only without a worker.

@route('/stream')
def stream():
    yield 'START'
    sleep(3)
    yield 'MIDDLE'
    sleep(5)
    yield 'END'

I'd like to be able to do something like below. Since I can't yield/return a generator this isn't possible this way.

@route('/stream')
def stream():
    worker = getWorker()
    yield worker.doStuff()
class worker:
    # Remember, this is run in a subprocess in real life.
    def doStuff():
        yield 'START'
        sleep(3)
        yield 'MIDDLE'
        sleep(5)
        yield 'END'

This is for a large project and I don't have much flexibility in the way that I do things. I know sometimes the easiest answer is "your design is wrong." In this case though, I have some constraints that are beyond my control (the route has to be a data stream and the work has to be done by a subprocess).

EDIT I also can't have doStuff() block. I'd like to be able to create something like a gevent queue that I return and have the worker process. The problem now is that it doesn't seem like I can use gevent.queue and Process together.

@route('/stream')
def index():
    body = gevent.queue.Queue()
    worker = multiprocessing.Process(target=do_stuff, args=body)
    worker.start()
    return body()

def do_stuff(body):
    while True:
        gevent.sleep(5)
        body.put("data")
like image 628
Alex Wade Avatar asked Nov 10 '12 02:11

Alex Wade


2 Answers

After a lot of research and experimenting, I've determined that a gevent queue can't be used with Python multiprocessing in this way. Instead of doing things this way, something like redis can be used to allow the processes and gevent greenlets communicate.

@route('/stream')
def index():
    worker = multiprocessing.Process(target=do_stuff)
    worker.start()
    yield redis_server.lpop()

def do_stuff(body):
    while True:
        gevent.sleep(5)
        redis_server.lpush("data")
like image 156
Alex Wade Avatar answered Sep 27 '22 23:09

Alex Wade


In your last example, worker.doStuff() returns a generator, which is iterable. You can just return that (change yield to return). Bottle accept iterables as return values, as long es they yield byte or unicode strings.

like image 39
defnull Avatar answered Sep 28 '22 00:09

defnull