I want to do a very simple thing: Launch a worker to something and then return the answer to user. I'm trying to do so using a combination of Flask and RQ.
import os
from flask import Flask, session
from somewhere import do_something
from rq import Queue
from worker import conn
app = Flask(__name__)
app.debug = True
app.secret_key = '....'
q = Queue(connection=conn)
@app.route('/make/')
def make():
job = q.enqueue(do_something, 'argument')
session['job'] = job
return 'Done'
@app.route('/get/')
def get():
try:
session['job'].refresh()
out = str(session['job'].result)
except:
out = 'No result yet'
return out
The idea in this very simple example is that people go to /make/ and the job starts. After some time there can go to /get/ and the result from the worker will be printed there.
However one line is causing problems:
session['job'] = job
It seems the job cannot be pickled, which is apparently used by the Flaks session. I'm getting the error:
...
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/app.py", line 804, in save_session
10:52:16 web.1 | return self.session_interface.save_session(self, session, response)
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/flask/sessions.py", line 205, in save_session
10:52:16 web.1 | secure=secure, domain=domain)
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 329, in save_cookie
10:52:16 web.1 | data = self.serialize(session_expires or expires)
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 235, in serialize
10:52:16 web.1 | self.quote(value)
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/lib/python2.7/site-packages/werkzeug/contrib/securecookie.py", line 192, in quote
10:52:16 web.1 | value = cls.serialization_method.dumps(value)
10:52:16 web.1 | File "/Users/julius/twitter-sentiment/venv/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex
10:52:16 web.1 | raise TypeError, "can't pickle %s objects" % base.__name__
10:52:16 web.1 | TypeError: can't pickle function objects
I really hope something can help. I might be doing this in a completely wrong way (with passing the job via a session), but I have no idea how else to access the result of the job...
Any help will be highly appreciated.
Thanks in advance.
RQ (Redis Queue) is a simple Python library for queueing jobs and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It can be integrated in your web stack easily.
If, at any time, the worker receives SIGINT (via Ctrl+C) or SIGTERM (via kill ), the worker wait until the currently running task is finished, stop the work loop and gracefully register its own death.
Simple as dead support of Redis database for Flask apps. Initialize Redis extension for Flask application. If app argument provided then initialize redis connection using application config values. If no app argument provided you should do initialization later with init_app() method.
I've not used rq
before but I see that a job has a .key
property. It might be easier to store that hash in your session. Then you can use the Job
class's .fetch
method which will itself call a .refresh()
and return the job to you. Reading the .result()
at that point would give you the job's current status.
Maybe like this (untested):
from rq.job import Job
@app.route('/make/')
def make():
job = q.enqueue(do_something, 'argument')
session['job'] = job.key
return 'Done'
@app.route('/get/')
def get():
try:
job = Job()
job.fetch(session['job'])
out = str(job.result)
except:
out = 'No result yet'
return out
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With