Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flask: passing around background worker job (rq, redis)

I want to do a very simple thing: Launch a worker to something and then return the answer to user. I'm trying to do so using a combination of Flask and RQ.

import os
from flask import Flask, session
from somewhere import do_something
from rq import Queue
from worker import conn

app = Flask(__name__)
app.debug = True
app.secret_key = '....'

q = Queue(connection=conn)

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job
    return 'Done'

@app.route('/get/')
def get():
    try:
        session['job'].refresh()
        out = str(session['job'].result)
    except:
        out = 'No result yet'
    return out

The idea in this very simple example is that people go to /make/ and the job starts. After some time there can go to /get/ and the result from the worker will be printed there.

However one line is causing problems:

session['job'] = job

It seems the job cannot be pickled, which is apparently used by the Flaks session. I'm getting the error:

...
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/app.py", line 804, in save_session
10:52:16 web.1     |     return self.session_interface.save_session(self, session, response)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/sessions.py", line 205, in save_session
10:52:16 web.1     |     secure=secure, domain=domain)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 329, in save_cookie
10:52:16 web.1     |     data = self.serialize(session_expires or expires)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 235, in serialize
10:52:16 web.1     |     self.quote(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 192, in quote
10:52:16 web.1     |     value = cls.serialization_method.dumps(value)
10:52:16 web.1     |   File "/Users/julius/twitter-sentiment/venv/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
10:52:16 web.1     |     raise TypeError, "can't pickle %s objects" % base.__name__
10:52:16 web.1     | TypeError: can't pickle function objects

I really hope something can help. I might be doing this in a completely wrong way (with passing the job via a session), but I have no idea how else to access the result of the job...

Any help will be highly appreciated.

Thanks in advance.

like image 453
Julius Avatar asked Aug 28 '12 14:08

Julius


People also ask

What is Redis RQ?

RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.

How do I stop RQ workers?

If, at any time, the worker receives SIGINT (via Ctrl+C) or SIGTERM (via kill ), the worker wait until the currently running task is finished, stop the work loop and gracefully register its own death.

How does Redis integrate with Flask?

Simple as dead support of Redis database for Flask apps. Initialize Redis extension for Flask application. If app argument provided then initialize redis connection using application config values. If no app argument provided you should do initialization later with init_app() method.


1 Answers

I've not used rq before but I see that a job has a .key property. It might be easier to store that hash in your session. Then you can use the Job class's .fetch method which will itself call a .refresh() and return the job to you. Reading the .result() at that point would give you the job's current status.

Maybe like this (untested):

from rq.job import Job

@app.route('/make/')
def make():
    job = q.enqueue(do_something, 'argument')
    session['job'] = job.key
    return 'Done'

@app.route('/get/')
def get():
    try:
        job = Job()
        job.fetch(session['job'])
        out = str(job.result)
    except:
        out = 'No result yet'
    return out
like image 173
aezell Avatar answered Sep 18 '22 04:09

aezell